diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 8d2bae78f671e..c8a4c7a6ab8fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -879,6 +879,80 @@ def _extract_lineages(self): return +@dataclass +class ConfluentS3SinkConnector: + connector_manifest: ConnectorManifest + + def __init__( + self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport + ) -> None: + self.connector_manifest = connector_manifest + self.report = report + self._extract_lineages() + + @dataclass + class S3SinkParser: + target_platform: str + bucket: str + topics_dir: str + topics: Iterable[str] + + def _get_parser(self, connector_manifest: ConnectorManifest) -> S3SinkParser: + # https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#s3 + bucket = connector_manifest.config.get("s3.bucket.name") + if not bucket: + raise ValueError( + "Could not find 's3.bucket.name' in connector configuration" + ) + + # https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage + topics_dir = connector_manifest.config.get("topics.dir", "topics") + + return self.S3SinkParser( + target_platform="s3", + bucket=bucket, + topics_dir=topics_dir, + topics=connector_manifest.topic_names, + ) + + def _extract_lineages(self): + self.connector_manifest.flow_property_bag = self.connector_manifest.config + + # remove keys, secrets from properties + secret_properties = [ + "aws.access.key.id", + "aws.secret.access.key", + "s3.sse.customer.key", + "s3.proxy.password", + ] + for k in secret_properties: + if k in self.connector_manifest.flow_property_bag: + del self.connector_manifest.flow_property_bag[k] + + try: + parser = self._get_parser(self.connector_manifest) + + lineages: List[KafkaConnectLineage] = list() + for topic in parser.topics: + target_dataset = f"{parser.bucket}/{parser.topics_dir}/{topic}" + + lineages.append( + KafkaConnectLineage( + source_dataset=topic, + source_platform="kafka", + target_dataset=target_dataset, + target_platform=parser.target_platform, + ) + ) + self.connector_manifest.lineages = lineages + except Exception as e: + self.report.report_warning( + self.connector_manifest.name, f"Error resolving lineage: {e}" + ) + + return + + def transform_connector_config( connector_config: Dict, provided_configs: List[ProvidedConfig] ) -> None: @@ -1026,6 +1100,12 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: connector_manifest = BigQuerySinkConnector( connector_manifest=connector_manifest, report=self.report ).connector_manifest + elif connector_manifest.config.get("connector.class").__eq__( + "io.confluent.connect.s3.S3SinkConnector" + ): + connector_manifest = ConfluentS3SinkConnector( + connector_manifest=connector_manifest, report=self.report + ).connector_manifest else: self.report.report_dropped(connector_manifest.name) logger.warning( diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml index dbb4f84c0a655..260887ff3d1a0 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -36,10 +36,11 @@ services: # confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1 # + confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.1 + # curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \ | tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \ --strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar - # curl -k -SL "https://repo1.maven.org/maven2/io/strimzi/kafka-env-var-config-provider/0.1.1/kafka-env-var-config-provider-0.1.1.tar.gz" \ | tar -xzf - -C /usr/share/confluent-hub-components/ # @@ -86,3 +87,10 @@ services: - MONGO_INITDB_DATABASE=test_db volumes: - ./../kafka-connect/setup/conf/:/scripts/ + + s3mock: + image: adobe/s3mock:2.13.0 + environment: + - initialBuckets=test-bucket + ports: + - "9090:9090" diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_s3sink_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_s3sink_mces_golden.json new file mode 100644 index 0000000000000..c5a76bb85092d --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_s3sink_mces_golden.json @@ -0,0 +1,109 @@ +[ + { + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "s3.region": "ap-southeast-2", + "flush.size": "100", + "tasks.max": "1", + "timezone": "UTC", + "topics": "my-topic", + "store.url": "http://s3mock:9090", + "max.interval": "5000", + "locale": "en_AU", + "key.converter.schemas.enable": "false", + "s3.compression.type": "gzip", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", + "value.converter.schemas.enable": "false", + "name": "confluent_s3_sink_connector", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "s3.bucket.name": "test-bucket", + "timestamp.extractor": "Record", + "key.converter": "org.apache.kafka.connect.json.JsonConverter" + }, + "name": "confluent_s3_sink_connector", + "description": "Sink connector using `io.confluent.connect.s3.S3SinkConnector` plugin." + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + }, + { + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "confluent_s3_sink_connector:my-topic", + "type": { + "string": "COMMAND" + } + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + }, + { + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,my-topic,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:s3,test-bucket/topics/my-topic,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + }, + { + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + }, + { + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,confluent_s3_sink_connector,PROD),my-topic)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + } +] diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_s3sink_to_file.yml b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_s3sink_to_file.yml new file mode 100644 index 0000000000000..90e086eb1e209 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_s3sink_to_file.yml @@ -0,0 +1,21 @@ +--- +run_id: kafka-connect-run + +# see https://datahubproject.io/docs/generated/ingestion/sources/kafka-connect for complete documentation +source: + type: "kafka-connect" + config: + connect_uri: "http://localhost:28083" + connector_patterns: + allow: + - confluent_s3_sink_connector + provided_configs: + - provider: env + path_key: S3_ENDPOINT_URL + value: http://s3mock:9090 + +# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation +sink: + type: file + config: + filename: "./kafka_connect_mces.json" diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml index 67ec384efcbf8..f2d8dd7b860b7 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml @@ -9,6 +9,7 @@ source: connector_patterns: deny: - source_mongodb_connector + - confluent_s3_sink_connector provided_configs: - provider: env path_key: MYSQL_CONNECTION_URL @@ -16,6 +17,9 @@ source: - provider: env path_key: POSTGRES_CONNECTION_URL value: jdbc:postgresql://test_postgres:5432/postgres + - provider: env + path_key: S3_ENDPOINT_URL + value: http://s3mock:9090 convert_lineage_urns_to_lowercase: true platform_instance_map: # optional mysql: mysql1 # optional diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env b/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env index 7b25ae37b10a6..40d5dcfc9a91a 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env @@ -24,3 +24,4 @@ CONNECT_CONFIG_PROVIDERS=env CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb POSTGRES_CONNECTION_URL=jdbc:postgresql://test_postgres:5432/postgres?user=postgres&password=datahub +S3_ENDPOINT_URL=http://s3mock:9090 \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 49dada0cddcbb..5f907bb05443c 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -302,6 +302,40 @@ def loaded_kafka_connect(kafka_connect_runner): ) assert ret.returncode == 0 + # Creating S3 Sink source + r = requests.post( + KAFKA_CONNECT_ENDPOINT, + headers={"Content-Type": "application/json"}, + data=r"""{ + "name": "confluent_s3_sink_connector", + "config": { + "aws.access.key.id": "x", + "aws.secret.access.key": "x", + "tasks.max": "1", + "max.interval": 5000, + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "s3.region": "ap-southeast-2", + "s3.bucket.name": "test-bucket", + "s3.compression.type": "gzip", + "store.url": "${env:S3_ENDPOINT_URL}", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "flush.size": 100, + "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", + "locale": "en_AU", + "timezone": "UTC", + "timestamp.extractor": "Record", + "topics": "my-topic", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": false, + "value.converter.schemas.enable": false + } + }""", + ) + r.raise_for_status() + assert r.status_code == 201 + # Give time for connectors to process the table data kafka_connect_runner.wait_until_responsive( timeout=30, @@ -346,6 +380,24 @@ def test_kafka_connect_mongosourceconnect_ingest( ) +@freeze_time(FROZEN_TIME) +@pytest.mark.integration_batch_1 +def test_kafka_connect_s3sink_ingest( + loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir +): + # Run the metadata ingestion pipeline. + config_file = (test_resources_dir / "kafka_connect_s3sink_to_file.yml").resolve() + run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "kafka_connect_mces.json", + golden_path=test_resources_dir / "kafka_connect_s3sink_mces_golden.json", + ignore_paths=[], + ) + + @freeze_time(FROZEN_TIME) @pytest.mark.integration_batch_1 def test_kafka_connect_ingest_stateful(