From 501522d891a4c608784e0c92c32b99d67e80f4b0 Mon Sep 17 00:00:00 2001 From: Shubham Jagtap <132359390+shubhamjagtap639@users.noreply.github.com> Date: Sat, 23 Sep 2023 05:42:48 +0530 Subject: [PATCH] feat(ingest/kafka-connect): Lineage for Kafka Connect > Snowflake (#8811) Co-authored-by: Harshal Sheth --- .../docs/sources/kafka-connect/README.md | 2 +- metadata-ingestion/setup.py | 1 + .../datahub/ingestion/source/kafka_connect.py | 108 +++++++++++++ ...ka_connect_snowflake_sink_mces_golden.json | 152 ++++++++++++++++++ .../kafka-connect/test_kafka_connect.py | 100 ++++++++++++ 5 files changed, 362 insertions(+), 1 deletion(-) create mode 100644 metadata-ingestion/tests/integration/kafka-connect/kafka_connect_snowflake_sink_mces_golden.json diff --git a/metadata-ingestion/docs/sources/kafka-connect/README.md b/metadata-ingestion/docs/sources/kafka-connect/README.md index 5031bff5a3fac..e4f64c62914c5 100644 --- a/metadata-ingestion/docs/sources/kafka-connect/README.md +++ b/metadata-ingestion/docs/sources/kafka-connect/README.md @@ -21,4 +21,4 @@ This ingestion source maps the following Source System Concepts to DataHub Conce Works only for - Source connectors: JDBC, Debezium, Mongo and Generic connectors with user-defined lineage graph -- Sink connectors: BigQuery +- Sink connectors: BigQuery, Confluent S3, Snowflake diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 10e6ff554d9f8..a0d16aa92ad9b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -505,6 +505,7 @@ def get_long_description(): "nifi", "vertica", "mode", + "kafka-connect", ] if plugin for dependency in plugins[plugin] diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index b3fa5e3401c07..f3344782917ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -901,6 +901,108 @@ def _extract_lineages(self): return +@dataclass +class SnowflakeSinkConnector: + connector_manifest: ConnectorManifest + report: KafkaConnectSourceReport + + def __init__( + self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport + ) -> None: + self.connector_manifest = connector_manifest + self.report = report + self._extract_lineages() + + @dataclass + class SnowflakeParser: + database_name: str + schema_name: str + topics_to_tables: Dict[str, str] + + def report_warning(self, key: str, reason: str) -> None: + logger.warning(f"{key}: {reason}") + self.report.report_warning(key, reason) + + def get_table_name_from_topic_name(self, topic_name: str) -> str: + """ + This function converts the topic name to a valid Snowflake table name using some rules. + Refer below link for more info + https://docs.snowflake.com/en/user-guide/kafka-connector-overview#target-tables-for-kafka-topics + """ + table_name = re.sub("[^a-zA-Z0-9_]", "_", topic_name) + if re.match("^[^a-zA-Z_].*", table_name): + table_name = "_" + table_name + # Connector may append original topic's hash code as suffix for conflict resolution + # if generated table names for 2 topics are similar. This corner case is not handled here. + # Note that Snowflake recommends to choose topic names that follow the rules for + # Snowflake identifier names so this case is not recommended by snowflake. + return table_name + + def get_parser( + self, + connector_manifest: ConnectorManifest, + ) -> SnowflakeParser: + database_name = connector_manifest.config["snowflake.database.name"] + schema_name = connector_manifest.config["snowflake.schema.name"] + + # Fetch user provided topic to table map + provided_topics_to_tables: Dict[str, str] = {} + if connector_manifest.config.get("snowflake.topic2table.map"): + for each in connector_manifest.config["snowflake.topic2table.map"].split( + "," + ): + topic, table = each.split(":") + provided_topics_to_tables[topic.strip()] = table.strip() + + topics_to_tables: Dict[str, str] = {} + # Extract lineage for only those topics whose data ingestion started + for topic in connector_manifest.topic_names: + if topic in provided_topics_to_tables: + # If user provided which table to get mapped with this topic + topics_to_tables[topic] = provided_topics_to_tables[topic] + else: + # Else connector converts topic name to a valid Snowflake table name. + topics_to_tables[topic] = self.get_table_name_from_topic_name(topic) + + return self.SnowflakeParser( + database_name=database_name, + schema_name=schema_name, + topics_to_tables=topics_to_tables, + ) + + def _extract_lineages(self): + self.connector_manifest.flow_property_bag = self.connector_manifest.config + + # For all snowflake sink connector properties, refer below link + # https://docs.snowflake.com/en/user-guide/kafka-connector-install#configuring-the-kafka-connector + # remove private keys, secrets from properties + secret_properties = [ + "snowflake.private.key", + "snowflake.private.key.passphrase", + "value.converter.basic.auth.user.info", + ] + for k in secret_properties: + if k in self.connector_manifest.flow_property_bag: + del self.connector_manifest.flow_property_bag[k] + + lineages: List[KafkaConnectLineage] = list() + parser = self.get_parser(self.connector_manifest) + + for topic, table in parser.topics_to_tables.items(): + target_dataset = f"{parser.database_name}.{parser.schema_name}.{table}" + lineages.append( + KafkaConnectLineage( + source_dataset=topic, + source_platform=KAFKA, + target_dataset=target_dataset, + target_platform="snowflake", + ) + ) + + self.connector_manifest.lineages = lineages + return + + @dataclass class ConfluentS3SinkConnector: connector_manifest: ConnectorManifest @@ -1130,6 +1232,12 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: connector_manifest = ConfluentS3SinkConnector( connector_manifest=connector_manifest, report=self.report ).connector_manifest + elif connector_manifest.config.get("connector.class").__eq__( + "com.snowflake.kafka.connector.SnowflakeSinkConnector" + ): + connector_manifest = SnowflakeSinkConnector( + connector_manifest=connector_manifest, report=self.report + ).connector_manifest else: self.report.report_dropped(connector_manifest.name) logger.warning( diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_snowflake_sink_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_snowflake_sink_mces_golden.json new file mode 100644 index 0000000000000..76d49cebe5ae3 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_snowflake_sink_mces_golden.json @@ -0,0 +1,152 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "snowflake.database.name": "kafka_db", + "snowflake.schema.name": "kafka_schema", + "snowflake.topic2table.map": "topic1:table1", + "tasks.max": "1", + "topics": "topic1,_topic+2", + "snowflake.user.name": "kafka_connector_user_1", + "name": "snowflake_sink1", + "snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443" + }, + "name": "snowflake_sink1", + "description": "Sink connector using `com.snowflake.kafka.connector.SnowflakeSinkConnector` plugin." + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "snowflake_sink1:topic1", + "type": { + "string": "COMMAND" + } + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,kafka_db.kafka_schema.table1,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "snowflake_sink1:_topic+2", + "type": { + "string": "COMMAND" + } + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,_topic+2,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,kafka_db.kafka_schema._topic_2,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 5f907bb05443c..48063908e624f 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -534,3 +534,103 @@ def test_kafka_connect_ingest_stateful( "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.member)", ] assert sorted(deleted_job_urns) == sorted(difference_job_urns) + + +def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: + api_vs_response = { + "http://localhost:28083": { + "method": "GET", + "status_code": 200, + "json": { + "version": "7.4.0-ccs", + "commit": "30969fa33c185e880b9e02044761dfaac013151d", + "kafka_cluster_id": "MDgRZlZhSZ-4fXhwRR79bw", + }, + }, + } + + api_vs_response.update(override_data) + + for url in api_vs_response.keys(): + request_mock.register_uri( + api_vs_response[url]["method"], + url, + json=api_vs_response[url]["json"], + status_code=api_vs_response[url]["status_code"], + ) + + +@freeze_time(FROZEN_TIME) +def test_kafka_connect_snowflake_sink_ingest( + pytestconfig, tmp_path, mock_time, requests_mock +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect" + override_data = { + "http://localhost:28083/connectors": { + "method": "GET", + "status_code": 200, + "json": ["snowflake_sink1"], + }, + "http://localhost:28083/connectors/snowflake_sink1": { + "method": "GET", + "status_code": 200, + "json": { + "name": "snowflake_sink1", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "snowflake.database.name": "kafka_db", + "snowflake.schema.name": "kafka_schema", + "snowflake.topic2table.map": "topic1:table1", + "tasks.max": "1", + "topics": "topic1,_topic+2", + "snowflake.user.name": "kafka_connector_user_1", + "snowflake.private.key": "rrSnqU=", + "name": "snowflake_sink1", + "snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443", + }, + "tasks": [{"connector": "snowflake_sink1", "task": 0}], + "type": "sink", + }, + }, + "http://localhost:28083/connectors/snowflake_sink1/topics": { + "method": "GET", + "status_code": 200, + "json": {"snowflake_sink1": {"topics": ["topic1", "_topic+2"]}}, + }, + } + + register_mock_api(request_mock=requests_mock, override_data=override_data) + + pipeline = Pipeline.create( + { + "run_id": "kafka-connect-test", + "source": { + "type": "kafka-connect", + "config": { + "platform_instance": "connect-instance-1", + "connect_uri": KAFKA_CONNECT_SERVER, + "connector_patterns": { + "allow": [ + "snowflake_sink1", + ] + }, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/kafka_connect_snowflake_sink_mces.json", + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + golden_file = "kafka_connect_snowflake_sink_mces_golden.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "kafka_connect_snowflake_sink_mces.json", + golden_path=f"{test_resources_dir}/{golden_file}", + )