diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 71282824b40744..385a747ff57d9f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -915,18 +915,15 @@ class S3SinkParser: target_platform: str bucket: str topics_dir: str - topics: List[str] + topics: Iterable[str] def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser: # https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#s3 bucket = connector_manifest.config.get("s3.bucket.name") if not bucket: - raise ValueError(f"Could not find 's3.bucket.name' in connector configuration {self.connector_manifest.name}") - - # https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html#topics - topics = connector_manifest.config.get("topics") - if not topics: - raise ValueError(f"Could not find 'topics' in connector configuration {self.connector_manifest.name}") + raise ValueError( + "Could not find 's3.bucket.name' in connector configuration" + ) # https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage topics_dir = connector_manifest.config.get("topics.dir", "topics") @@ -935,13 +932,10 @@ def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser: target_platform="s3", bucket=bucket, topics_dir=topics_dir, - topics=[s for s in topics.split(",") if s], + topics=connector_manifest.topic_names, ) def _extract_lineages(self): - lineages: List[KafkaConnectLineage] = list() - parser = self._get_parser(self.connector_manifest) - self.connector_manifest.flow_property_bag = self.connector_manifest.config # remove keys, secrets from properties @@ -955,18 +949,27 @@ def _extract_lineages(self): if k in self.connector_manifest.flow_property_bag: del self.connector_manifest.flow_property_bag[k] - for topic in parser.topics: - target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}" - - lineages.append( - KafkaConnectLineage( - source_dataset=topic, - source_platform="kafka", - target_dataset=target_dataset, - target_platform=parser.target_platform, + lineages: List[KafkaConnectLineage] = list() + try: + parser = self._get_parser(self.connector_manifest) + + for topic in parser.topics: + target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}" + + lineages.append( + KafkaConnectLineage( + source_dataset=topic, + source_platform="kafka", + target_dataset=target_dataset, + target_platform=parser.target_platform, + ) ) + self.connector_manifest.lineages = lineages + except Exception as e: + self.report.report_warning( + self.connector_manifest.name, f"Error resolving lineage: {e}" ) - self.connector_manifest.lineages = lineages + return diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 8efea3ced6254d..2a5490884d2e7c 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -313,6 +313,38 @@ def loaded_kafka_connect(kafka_connect_runner): r.raise_for_status() assert r.status_code == 201 # Created + # Creating S3 Sink source + r = requests.post( + KAFKA_CONNECT_ENDPOINT, + headers={"Content-Type": "application/json"}, + data=r"""{ + "name": "confluent_s3_sink_connector", + "config": { + "tasks.max": "1", + "max.interval": 5000, + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "s3.region": "ap-southeast-2", + "s3.bucket.name": "test-bucket", + "s3.compression.type": "gzip", + "store.url": "${env:S3_ENDPOINT_URL}", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "flush.size": 100, + "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", + "locale": "en_AU", + "timezone": "UTC", + "timestamp.extractor": "Record", + "topics": "my-topic", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": false, + "value.converter.schemas.enable": false + } + }""", + ) + r.raise_for_status() + assert r.status_code == 201 + # Give time for connectors to process the table data time.sleep(60) @@ -353,6 +385,24 @@ def test_kafka_connect_mongosourceconnect_ingest( ) +@freeze_time(FROZEN_TIME) +@pytest.mark.integration_batch_1 +def test_kafka_connect_s3sink_ingest( + loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir +): + # Run the metadata ingestion pipeline. + config_file = (test_resources_dir / "kafka_connect_s3sink_to_file.yml").resolve() + run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "kafka_connect_mces.json", + golden_path=test_resources_dir / "kafka_connect_s3sink_mces_golden.json", + ignore_paths=[], + ) + + @freeze_time(FROZEN_TIME) @pytest.mark.integration_batch_1 def test_kafka_connect_ingest_stateful( diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect_s3sink.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect_s3sink.py deleted file mode 100644 index d7823fd49c5494..00000000000000 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect_s3sink.py +++ /dev/null @@ -1,105 +0,0 @@ -import time - -import pytest -import requests -from freezegun import freeze_time - -from tests.test_helpers import mce_helpers -from tests.test_helpers.click_helpers import run_datahub_cmd -from tests.test_helpers.docker_helpers import wait_for_port - -FROZEN_TIME = "2021-10-25 13:00:00" -GMS_PORT = 8080 -GMS_SERVER = f"http://localhost:{GMS_PORT}" -KAFKA_CONNECT_SERVER = "http://localhost:28083" -KAFKA_CONNECT_ENDPOINT = f"{KAFKA_CONNECT_SERVER}/connectors" - - -@pytest.fixture(scope="module") -def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir): - test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka" - - docker_compose_file = [ - str(test_resources_dir_kafka / "docker-compose.yml"), - str(test_resources_dir / "docker-compose.override.yml"), - ] - - with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services: - wait_for_port(docker_services, "test_broker", 29092, timeout=120) - wait_for_port(docker_services, "test_connect", 28083, timeout=120) - docker_services.wait_until_responsive( - timeout=60, - pause=1, - check=lambda: requests.get( - KAFKA_CONNECT_ENDPOINT, - ).status_code - == 200, - ) - yield docker_services - - -@pytest.fixture(scope="module") -def test_resources_dir(pytestconfig): - return pytestconfig.rootpath / "tests/integration/kafka-connect" - - -@pytest.fixture(scope="module") -def loaded_kafka_connect_with_s3sink_connector(kafka_connect_runner): - # Set up the container. - time.sleep(10) - - # Creating S3 Sink source - r = requests.post( - KAFKA_CONNECT_ENDPOINT, - headers={"Content-Type": "application/json"}, - data=r"""{ - "name": "confluent_s3_sink_connector", - "config": { - "tasks.max": "1", - "max.interval": 5000, - "connector.class": "io.confluent.connect.s3.S3SinkConnector", - "s3.region": "ap-southeast-2", - "s3.bucket.name": "test-bucket", - "s3.compression.type": "gzip", - "store.url": "${env:S3_ENDPOINT_URL}", - "storage.class": "io.confluent.connect.s3.storage.S3Storage", - "format.class": "io.confluent.connect.s3.format.json.JsonFormat", - "flush.size": 100, - "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", - "locale": "en_AU", - "timezone": "UTC", - "timestamp.extractor": "Record", - "topics": "my-topic", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": false, - "value.converter.schemas.enable": false - } - }""", - ) - r.raise_for_status() - assert r.status_code == 201 - - # Give time for connectors to process the table data - time.sleep(60) - - -@freeze_time(FROZEN_TIME) -@pytest.mark.integration_batch_1 -def test_kafka_connect_s3sink_ingest( - loaded_kafka_connect_with_s3sink_connector, - pytestconfig, - tmp_path, - test_resources_dir, -): - # Run the metadata ingestion pipeline. - config_file = (test_resources_dir / "kafka_connect_s3sink_to_file.yml").resolve() - run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) - - # Verify the output. - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / "kafka_connect_mces.json", - golden_path=test_resources_dir / "kafka_connect_s3sink_mces_golden.json", - ignore_paths=[], - )