Skip to content

Commit

Permalink
feat(ingest/kafka-connect): add support for Confluent S3 Sink Connect…
Browse files Browse the repository at this point in the history
…or (#8298)
  • Loading branch information
tusharm authored Aug 2, 2023
1 parent 80f0fde commit e2919cf
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 1 deletion.
80 changes: 80 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,80 @@ def _extract_lineages(self):
return


@dataclass
class ConfluentS3SinkConnector:
connector_manifest: ConnectorManifest

def __init__(
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
) -> None:
self.connector_manifest = connector_manifest
self.report = report
self._extract_lineages()

@dataclass
class S3SinkParser:
target_platform: str
bucket: str
topics_dir: str
topics: Iterable[str]

def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser:
# https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#s3
bucket = connector_manifest.config.get("s3.bucket.name")
if not bucket:
raise ValueError(
"Could not find 's3.bucket.name' in connector configuration"
)

# https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage
topics_dir = connector_manifest.config.get("topics.dir", "topics")

return self.S3SinkParser(
target_platform="s3",
bucket=bucket,
topics_dir=topics_dir,
topics=connector_manifest.topic_names,
)

def _extract_lineages(self):
self.connector_manifest.flow_property_bag = self.connector_manifest.config

# remove keys, secrets from properties
secret_properties = [
"aws.access.key.id",
"aws.secret.access.key",
"s3.sse.customer.key",
"s3.proxy.password",
]
for k in secret_properties:
if k in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag[k]

try:
parser = self._get_parser(self.connector_manifest)

lineages: List[KafkaConnectLineage] = list()
for topic in parser.topics:
target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}"

lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_platform="kafka",
target_dataset=target_dataset,
target_platform=parser.target_platform,
)
)
self.connector_manifest.lineages = lineages
except Exception as e:
self.report.report_warning(
self.connector_manifest.name, f"Error resolving lineage: {e}"
)

return


def transform_connector_config(
connector_config: Dict, provided_configs: List[ProvidedConfig]
) -> None:
Expand Down Expand Up @@ -1026,6 +1100,12 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
connector_manifest = BigQuerySinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
elif connector_manifest.config.get("connector.class").__eq__(
"io.confluent.connect.s3.S3SinkConnector"
):
connector_manifest = ConfluentS3SinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
else:
self.report.report_dropped(connector_manifest.name)
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ services:
#
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1
#
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.1
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
--strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar
#
curl -k -SL "https://repo1.maven.org/maven2/io/strimzi/kafka-env-var-config-provider/0.1.1/kafka-env-var-config-provider-0.1.1.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/
#
Expand Down Expand Up @@ -86,3 +87,10 @@ services:
- MONGO_INITDB_DATABASE=test_db
volumes:
- ./../kafka-connect/setup/conf/:/scripts/

s3mock:
image: adobe/s3mock:2.13.0
environment:
- initialBuckets=test-bucket
ports:
- "9090:9090"
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "ap-southeast-2",
"flush.size": "100",
"tasks.max": "1",
"timezone": "UTC",
"topics": "my-topic",
"store.url": "http://s3mock:9090",
"max.interval": "5000",
"locale": "en_AU",
"key.converter.schemas.enable": "false",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"value.converter.schemas.enable": "false",
"name": "confluent_s3_sink_connector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "test-bucket",
"timestamp.extractor": "Record",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"name": "confluent_s3_sink_connector",
"description": "Sink connector using `io.confluent.connect.s3.S3SinkConnector` plugin."
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "confluent_s3_sink_connector:my-topic",
"type": {
"string": "COMMAND"
}
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,my-topic,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,test-bucket/topics/my-topic,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
run_id: kafka-connect-run

# see https://datahubproject.io/docs/generated/ingestion/sources/kafka-connect for complete documentation
source:
type: "kafka-connect"
config:
connect_uri: "http://localhost:28083"
connector_patterns:
allow:
- confluent_s3_sink_connector
provided_configs:
- provider: env
path_key: S3_ENDPOINT_URL
value: http://s3mock:9090

# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: file
config:
filename: "./kafka_connect_mces.json"
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ source:
connector_patterns:
deny:
- source_mongodb_connector
- confluent_s3_sink_connector
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
- provider: env
path_key: POSTGRES_CONNECTION_URL
value: jdbc:postgresql://test_postgres:5432/postgres
- provider: env
path_key: S3_ENDPOINT_URL
value: http://s3mock:9090
convert_lineage_urns_to_lowercase: true
platform_instance_map: # optional
mysql: mysql1 # optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ CONNECT_CONFIG_PROVIDERS=env
CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider
MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb
POSTGRES_CONNECTION_URL=jdbc:postgresql://test_postgres:5432/postgres?user=postgres&password=datahub
S3_ENDPOINT_URL=http://s3mock:9090
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,40 @@ def loaded_kafka_connect(kafka_connect_runner):
)
assert ret.returncode == 0

# Creating S3 Sink source
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data=r"""{
"name": "confluent_s3_sink_connector",
"config": {
"aws.access.key.id": "x",
"aws.secret.access.key": "x",
"tasks.max": "1",
"max.interval": 5000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "ap-southeast-2",
"s3.bucket.name": "test-bucket",
"s3.compression.type": "gzip",
"store.url": "${env:S3_ENDPOINT_URL}",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": 100,
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en_AU",
"timezone": "UTC",
"timestamp.extractor": "Record",
"topics": "my-topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}""",
)
r.raise_for_status()
assert r.status_code == 201

# Give time for connectors to process the table data
kafka_connect_runner.wait_until_responsive(
timeout=30,
Expand Down Expand Up @@ -346,6 +380,24 @@ def test_kafka_connect_mongosourceconnect_ingest(
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_s3sink_ingest(
loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_connect_s3sink_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_s3sink_mces_golden.json",
ignore_paths=[],
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_ingest_stateful(
Expand Down

0 comments on commit e2919cf

Please sign in to comment.