Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/kafka-connect): add support for Confluent S3 Sink Connector #8298

Merged
merged 14 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,80 @@ def _extract_lineages(self):
return


@dataclass
class ConfluentS3SinkConnector:
connector_manifest: ConnectorManifest

def __init__(
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
) -> None:
self.connector_manifest = connector_manifest
self.report = report
self._extract_lineages()

@dataclass
class S3SinkParser:
target_platform: str
bucket: str
topics_dir: str
topics: Iterable[str]

def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser:
# https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#s3
bucket = connector_manifest.config.get("s3.bucket.name")
if not bucket:
raise ValueError(
"Could not find 's3.bucket.name' in connector configuration"
)

# https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage
topics_dir = connector_manifest.config.get("topics.dir", "topics")

return self.S3SinkParser(
target_platform="s3",
bucket=bucket,
topics_dir=topics_dir,
topics=connector_manifest.topic_names,
)

def _extract_lineages(self):
self.connector_manifest.flow_property_bag = self.connector_manifest.config

# remove keys, secrets from properties
secret_properties = [
"aws.access.key.id",
"aws.secret.access.key",
"s3.sse.customer.key",
"s3.proxy.password",
]
for k in secret_properties:
if k in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag[k]

try:
parser = self._get_parser(self.connector_manifest)

lineages: List[KafkaConnectLineage] = list()
for topic in parser.topics:
target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}"

lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_platform="kafka",
target_dataset=target_dataset,
target_platform=parser.target_platform,
)
)
self.connector_manifest.lineages = lineages
except Exception as e:
self.report.report_warning(
self.connector_manifest.name, f"Error resolving lineage: {e}"
)

return


def transform_connector_config(
connector_config: Dict, provided_configs: List[ProvidedConfig]
) -> None:
Expand Down Expand Up @@ -1026,6 +1100,12 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
connector_manifest = BigQuerySinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
elif connector_manifest.config.get("connector.class").__eq__(
"io.confluent.connect.s3.S3SinkConnector"
):
connector_manifest = ConfluentS3SinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
else:
self.report.report_dropped(connector_manifest.name)
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ services:
#
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1
#
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.1
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
--strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar
#
curl -k -SL "https://repo1.maven.org/maven2/io/strimzi/kafka-env-var-config-provider/0.1.1/kafka-env-var-config-provider-0.1.1.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/
#
Expand Down Expand Up @@ -86,3 +87,10 @@ services:
- MONGO_INITDB_DATABASE=test_db
volumes:
- ./../kafka-connect/setup/conf/:/scripts/

s3mock:
image: adobe/s3mock:2.13.0
environment:
- initialBuckets=test-bucket
ports:
- "9090:9090"
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "ap-southeast-2",
"flush.size": "100",
"tasks.max": "1",
"timezone": "UTC",
"topics": "my-topic",
"store.url": "http://s3mock:9090",
"max.interval": "5000",
"locale": "en_AU",
"key.converter.schemas.enable": "false",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"value.converter.schemas.enable": "false",
"name": "confluent_s3_sink_connector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "test-bucket",
"timestamp.extractor": "Record",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"name": "confluent_s3_sink_connector",
"description": "Sink connector using `io.confluent.connect.s3.S3SinkConnector` plugin."
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "confluent_s3_sink_connector:my-topic",
"type": {
"string": "COMMAND"
}
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,my-topic,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,test-bucket/topics/my-topic,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
run_id: kafka-connect-run

# see https://datahubproject.io/docs/generated/ingestion/sources/kafka-connect for complete documentation
source:
type: "kafka-connect"
config:
connect_uri: "http://localhost:28083"
tusharm marked this conversation as resolved.
Show resolved Hide resolved
connector_patterns:
allow:
- confluent_s3_sink_connector
provided_configs:
- provider: env
path_key: S3_ENDPOINT_URL
value: http://s3mock:9090

# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: file
config:
filename: "./kafka_connect_mces.json"
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ source:
connector_patterns:
deny:
- source_mongodb_connector
- confluent_s3_sink_connector
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
- provider: env
path_key: POSTGRES_CONNECTION_URL
value: jdbc:postgresql://test_postgres:5432/postgres
- provider: env
path_key: S3_ENDPOINT_URL
value: http://s3mock:9090
convert_lineage_urns_to_lowercase: true
platform_instance_map: # optional
mysql: mysql1 # optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ CONNECT_CONFIG_PROVIDERS=env
CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider
MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb
POSTGRES_CONNECTION_URL=jdbc:postgresql://test_postgres:5432/postgres?user=postgres&password=datahub
S3_ENDPOINT_URL=http://s3mock:9090
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,40 @@ def loaded_kafka_connect(kafka_connect_runner):
)
assert ret.returncode == 0

# Creating S3 Sink source
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data=r"""{
"name": "confluent_s3_sink_connector",
"config": {
"aws.access.key.id": "x",
"aws.secret.access.key": "x",
"tasks.max": "1",
"max.interval": 5000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "ap-southeast-2",
"s3.bucket.name": "test-bucket",
"s3.compression.type": "gzip",
"store.url": "${env:S3_ENDPOINT_URL}",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": 100,
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en_AU",
"timezone": "UTC",
"timestamp.extractor": "Record",
"topics": "my-topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}""",
)
r.raise_for_status()
assert r.status_code == 201

# Give time for connectors to process the table data
kafka_connect_runner.wait_until_responsive(
timeout=30,
Expand Down Expand Up @@ -346,6 +380,24 @@ def test_kafka_connect_mongosourceconnect_ingest(
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_s3sink_ingest(
loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_connect_s3sink_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_s3sink_mces_golden.json",
ignore_paths=[],
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_ingest_stateful(
Expand Down
Loading