Skip to content

Commit

Permalink
feat(ingest/kafka-connect): Lineage for Kafka Connect > Snowflake (#8811
Browse files Browse the repository at this point in the history
)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
shubhamjagtap639 and hsheth2 authored Sep 23, 2023
1 parent 5c40390 commit 501522d
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 1 deletion.
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/kafka-connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ This ingestion source maps the following Source System Concepts to DataHub Conce
Works only for

- Source connectors: JDBC, Debezium, Mongo and Generic connectors with user-defined lineage graph
- Sink connectors: BigQuery
- Sink connectors: BigQuery, Confluent S3, Snowflake
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ def get_long_description():
"nifi",
"vertica",
"mode",
"kafka-connect",
]
if plugin
for dependency in plugins[plugin]
Expand Down
108 changes: 108 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,108 @@ def _extract_lineages(self):
return


@dataclass
class SnowflakeSinkConnector:
connector_manifest: ConnectorManifest
report: KafkaConnectSourceReport

def __init__(
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
) -> None:
self.connector_manifest = connector_manifest
self.report = report
self._extract_lineages()

@dataclass
class SnowflakeParser:
database_name: str
schema_name: str
topics_to_tables: Dict[str, str]

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_table_name_from_topic_name(self, topic_name: str) -> str:
"""
This function converts the topic name to a valid Snowflake table name using some rules.
Refer below link for more info
https://docs.snowflake.com/en/user-guide/kafka-connector-overview#target-tables-for-kafka-topics
"""
table_name = re.sub("[^a-zA-Z0-9_]", "_", topic_name)
if re.match("^[^a-zA-Z_].*", table_name):
table_name = "_" + table_name
# Connector may append original topic's hash code as suffix for conflict resolution
# if generated table names for 2 topics are similar. This corner case is not handled here.
# Note that Snowflake recommends to choose topic names that follow the rules for
# Snowflake identifier names so this case is not recommended by snowflake.
return table_name

def get_parser(
self,
connector_manifest: ConnectorManifest,
) -> SnowflakeParser:
database_name = connector_manifest.config["snowflake.database.name"]
schema_name = connector_manifest.config["snowflake.schema.name"]

# Fetch user provided topic to table map
provided_topics_to_tables: Dict[str, str] = {}
if connector_manifest.config.get("snowflake.topic2table.map"):
for each in connector_manifest.config["snowflake.topic2table.map"].split(
","
):
topic, table = each.split(":")
provided_topics_to_tables[topic.strip()] = table.strip()

topics_to_tables: Dict[str, str] = {}
# Extract lineage for only those topics whose data ingestion started
for topic in connector_manifest.topic_names:
if topic in provided_topics_to_tables:
# If user provided which table to get mapped with this topic
topics_to_tables[topic] = provided_topics_to_tables[topic]
else:
# Else connector converts topic name to a valid Snowflake table name.
topics_to_tables[topic] = self.get_table_name_from_topic_name(topic)

return self.SnowflakeParser(
database_name=database_name,
schema_name=schema_name,
topics_to_tables=topics_to_tables,
)

def _extract_lineages(self):
self.connector_manifest.flow_property_bag = self.connector_manifest.config

# For all snowflake sink connector properties, refer below link
# https://docs.snowflake.com/en/user-guide/kafka-connector-install#configuring-the-kafka-connector
# remove private keys, secrets from properties
secret_properties = [
"snowflake.private.key",
"snowflake.private.key.passphrase",
"value.converter.basic.auth.user.info",
]
for k in secret_properties:
if k in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag[k]

lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)

for topic, table in parser.topics_to_tables.items():
target_dataset = f"{parser.database_name}.{parser.schema_name}.{table}"
lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_platform=KAFKA,
target_dataset=target_dataset,
target_platform="snowflake",
)
)

self.connector_manifest.lineages = lineages
return


@dataclass
class ConfluentS3SinkConnector:
connector_manifest: ConnectorManifest
Expand Down Expand Up @@ -1130,6 +1232,12 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
connector_manifest = ConfluentS3SinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
elif connector_manifest.config.get("connector.class").__eq__(
"com.snowflake.kafka.connector.SnowflakeSinkConnector"
):
connector_manifest = SnowflakeSinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
else:
self.report.report_dropped(connector_manifest.name)
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"snowflake.database.name": "kafka_db",
"snowflake.schema.name": "kafka_schema",
"snowflake.topic2table.map": "topic1:table1",
"tasks.max": "1",
"topics": "topic1,_topic+2",
"snowflake.user.name": "kafka_connector_user_1",
"name": "snowflake_sink1",
"snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443"
},
"name": "snowflake_sink1",
"description": "Sink connector using `com.snowflake.kafka.connector.SnowflakeSinkConnector` plugin."
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-test"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "snowflake_sink1:topic1",
"type": {
"string": "COMMAND"
}
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-test"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,kafka_db.kafka_schema.table1,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-test"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "snowflake_sink1:_topic+2",
"type": {
"string": "COMMAND"
}
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-test"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,_topic+2,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,kafka_db.kafka_schema._topic_2,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-test"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-test"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-test"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-test"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,103 @@ def test_kafka_connect_ingest_stateful(
"urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.member)",
]
assert sorted(deleted_job_urns) == sorted(difference_job_urns)


def register_mock_api(request_mock: Any, override_data: dict = {}) -> None:
api_vs_response = {
"http://localhost:28083": {
"method": "GET",
"status_code": 200,
"json": {
"version": "7.4.0-ccs",
"commit": "30969fa33c185e880b9e02044761dfaac013151d",
"kafka_cluster_id": "MDgRZlZhSZ-4fXhwRR79bw",
},
},
}

api_vs_response.update(override_data)

for url in api_vs_response.keys():
request_mock.register_uri(
api_vs_response[url]["method"],
url,
json=api_vs_response[url]["json"],
status_code=api_vs_response[url]["status_code"],
)


@freeze_time(FROZEN_TIME)
def test_kafka_connect_snowflake_sink_ingest(
pytestconfig, tmp_path, mock_time, requests_mock
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect"
override_data = {
"http://localhost:28083/connectors": {
"method": "GET",
"status_code": 200,
"json": ["snowflake_sink1"],
},
"http://localhost:28083/connectors/snowflake_sink1": {
"method": "GET",
"status_code": 200,
"json": {
"name": "snowflake_sink1",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"snowflake.database.name": "kafka_db",
"snowflake.schema.name": "kafka_schema",
"snowflake.topic2table.map": "topic1:table1",
"tasks.max": "1",
"topics": "topic1,_topic+2",
"snowflake.user.name": "kafka_connector_user_1",
"snowflake.private.key": "rrSnqU=",
"name": "snowflake_sink1",
"snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443",
},
"tasks": [{"connector": "snowflake_sink1", "task": 0}],
"type": "sink",
},
},
"http://localhost:28083/connectors/snowflake_sink1/topics": {
"method": "GET",
"status_code": 200,
"json": {"snowflake_sink1": {"topics": ["topic1", "_topic+2"]}},
},
}

register_mock_api(request_mock=requests_mock, override_data=override_data)

pipeline = Pipeline.create(
{
"run_id": "kafka-connect-test",
"source": {
"type": "kafka-connect",
"config": {
"platform_instance": "connect-instance-1",
"connect_uri": KAFKA_CONNECT_SERVER,
"connector_patterns": {
"allow": [
"snowflake_sink1",
]
},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/kafka_connect_snowflake_sink_mces.json",
},
},
}
)

pipeline.run()
pipeline.raise_from_status()
golden_file = "kafka_connect_snowflake_sink_mces_golden.json"

mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_snowflake_sink_mces.json",
golden_path=f"{test_resources_dir}/{golden_file}",
)

0 comments on commit 501522d

Please sign in to comment.