Skip to content

Commit

Permalink
feat(ingest/kafka-connect): Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharm committed Jul 10, 2023
1 parent 8dab660 commit c5a9644
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 131 deletions.
43 changes: 23 additions & 20 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,18 +915,15 @@ class S3SinkParser:
target_platform: str
bucket: str
topics_dir: str
topics: List[str]
topics: Iterable[str]

def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser:
# https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#s3
bucket = connector_manifest.config.get("s3.bucket.name")
if not bucket:
raise ValueError(f"Could not find 's3.bucket.name' in connector configuration {self.connector_manifest.name}")

# https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html#topics
topics = connector_manifest.config.get("topics")
if not topics:
raise ValueError(f"Could not find 'topics' in connector configuration {self.connector_manifest.name}")
raise ValueError(
"Could not find 's3.bucket.name' in connector configuration"
)

# https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage
topics_dir = connector_manifest.config.get("topics.dir", "topics")
Expand All @@ -935,13 +932,10 @@ def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser:
target_platform="s3",
bucket=bucket,
topics_dir=topics_dir,
topics=[s for s in topics.split(",") if s],
topics=connector_manifest.topic_names,
)

def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self._get_parser(self.connector_manifest)

self.connector_manifest.flow_property_bag = self.connector_manifest.config

# remove keys, secrets from properties
Expand All @@ -955,18 +949,27 @@ def _extract_lineages(self):
if k in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag[k]

for topic in parser.topics:
target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}"
try:
parser = self._get_parser(self.connector_manifest)

lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_platform="kafka",
target_dataset=target_dataset,
target_platform=parser.target_platform,
lineages: List[KafkaConnectLineage] = list()
for topic in parser.topics:
target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}"

lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_platform="kafka",
target_dataset=target_dataset,
target_platform=parser.target_platform,
)
)
self.connector_manifest.lineages = lineages
except Exception as e:
self.report.report_warning(
self.connector_manifest.name, f"Error resolving lineage: {e}"
)
self.connector_manifest.lineages = lineages

return


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-s3sink-run"
"runId": "kafka-connect-run"
}
},
{
Expand All @@ -53,7 +53,7 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-s3sink-run"
"runId": "kafka-connect-run"
}
},
{
Expand All @@ -73,7 +73,7 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-s3sink-run"
"runId": "kafka-connect-run"
}
},
{
Expand All @@ -88,7 +88,7 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-s3sink-run"
"runId": "kafka-connect-run"
}
},
{
Expand All @@ -103,7 +103,7 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-s3sink-run"
"runId": "kafka-connect-run"
}
}
]
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
run_id: kafka-connect-s3sink-run
run_id: kafka-connect-run

# see https://datahubproject.io/docs/generated/ingestion/sources/kafka-connect for complete documentation
source:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ source:
connector_patterns:
deny:
- source_mongodb_connector
- confluent_s3_sink_connector
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,40 @@ def loaded_kafka_connect(kafka_connect_runner):
r.raise_for_status()
assert r.status_code == 201 # Created

# Creating S3 Sink source
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data=r"""{
"name": "confluent_s3_sink_connector",
"config": {
"aws.access.key.id": "x",
"aws.secret.access.key": "x",
"tasks.max": "1",
"max.interval": 5000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "ap-southeast-2",
"s3.bucket.name": "test-bucket",
"s3.compression.type": "gzip",
"store.url": "${env:S3_ENDPOINT_URL}",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": 100,
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en_AU",
"timezone": "UTC",
"timestamp.extractor": "Record",
"topics": "my-topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}""",
)
r.raise_for_status()
assert r.status_code == 201

# Give time for connectors to process the table data
time.sleep(60)

Expand Down Expand Up @@ -353,6 +387,24 @@ def test_kafka_connect_mongosourceconnect_ingest(
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_s3sink_ingest(
loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_connect_s3sink_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_s3sink_mces_golden.json",
ignore_paths=[],
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_ingest_stateful(
Expand Down

This file was deleted.

0 comments on commit c5a9644

Please sign in to comment.