From 3681e1a12858eff0c81ec23d3b301dc62c9fdd40 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Fri, 18 Aug 2023 18:51:14 +0530 Subject: [PATCH] docs(ingest/kafka-connect): add details on platform instance mapping (#8654) --- .../sources/kafka-connect/kafka-connect.md | 55 +++++++++++++++++++ .../kafka-connect/kafka-connect_recipe.yml | 12 ++-- .../ingestion/extractor/json_schema_util.py | 1 + 3 files changed, 63 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md b/metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md index 9d400460407c8..03bcef70e1860 100644 --- a/metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md +++ b/metadata-ingestion/docs/sources/kafka-connect/kafka-connect.md @@ -1,5 +1,60 @@ ## Advanced Configurations +### Working with Platform Instances +If you've multiple instances of kafka OR source/sink systems that are referred in your `kafka-connect` setup, you'd need to configure platform instance for these systems in `kafka-connect` recipe to generate correct lineage edges. You must have already set `platform_instance` in recipes of original source/sink systems. Refer the document [Working with Platform Instances](https://datahubproject.io/docs/platform-instances) to understand more about this. + +There are two options available to declare source/sink system's `platform_instance` in `kafka-connect` recipe. If single instance of platform is used across all `kafka-connect` connectors, you can use `platform_instance_map` to specify platform_instance to use for a platform when constructing URNs for lineage. + +Example: +```yml + # Map of platform name to platform instance + platform_instance_map: + snowflake: snowflake_platform_instance + mysql: mysql_platform_instance + +``` +If multiple instances of platform are used across `kafka-connect` connectors, you'd need to specify platform_instance to use for platform for every connector. + +#### Example - Multiple MySQL Source Connectors each reading from different mysql instance +```yml + # Map of platform name to platform instance per connector + connect_to_platform_map: + mysql_connector1: + mysql: mysql_instance1 + + mysql_connector2: + mysql: mysql_instance2 +``` +Here mysql_connector1 and mysql_connector2 are names of MySQL source connectors as defined in `kafka-connect` connector config. + +#### Example - Multiple MySQL Source Connectors each reading from difference mysql instance and writing to different kafka cluster +```yml + connect_to_platform_map: + mysql_connector1: + mysql: mysql_instance1 + kafka: kafka_instance1 + + mysql_connector2: + mysql: mysql_instance2 + kafka: kafka_instance2 +``` +You can also use combination of `platform_instance_map` and `connect_to_platform_map` in your recipe. Note that, the platform_instance specified for the connector in `connect_to_platform_map` will always take higher precedance even if platform_instance for same platform is set in `platform_instance_map`. + +If you do not use `platform_instance` in original source/sink recipes, you do not need to specify them in above configurations. + +Note that, you do not need to specify platform_instance for BigQuery. + +#### Example - Multiple BigQuery Sink Connectors each writing to different kafka cluster +```yml + connect_to_platform_map: + bigquery_connector1: + kafka: kafka_instance1 + + bigquery_connector2: + kafka: kafka_instance2 +``` + +### Provided Configurations from External Sources Kafka Connect supports pluggable configuration providers which can load configuration data from external sources at runtime. These values are not available to DataHub ingestion source through Kafka Connect APIs. If you are using such provided configurations to specify connection url (database, etc) in Kafka Connect connector configuration then you will need also add these in `provided_configs` section in recipe for DataHub to generate correct lineage. ```yml diff --git a/metadata-ingestion/docs/sources/kafka-connect/kafka-connect_recipe.yml b/metadata-ingestion/docs/sources/kafka-connect/kafka-connect_recipe.yml index f5e33e661622d..cacbda5ca078a 100644 --- a/metadata-ingestion/docs/sources/kafka-connect/kafka-connect_recipe.yml +++ b/metadata-ingestion/docs/sources/kafka-connect/kafka-connect_recipe.yml @@ -3,14 +3,16 @@ source: config: # Coordinates connect_uri: "http://localhost:8083" - + # Credentials username: admin password: password # Optional - platform_instance_map: - bigquery: bigquery_platform_instance_id - + # Platform instance mapping to use when constructing URNs. + # Use if single instance of platform is referred across connectors. + platform_instance_map: + mysql: mysql_platform_instance + sink: - # sink configs \ No newline at end of file + # sink configs diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/json_schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/json_schema_util.py index 8e313e92cbf84..c943b83a887ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/json_schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/json_schema_util.py @@ -435,6 +435,7 @@ def _field_from_complex_type( field_path._set_parent_type_if_not_exists( DataHubType(type=MapTypeClass, nested_type=value_type) ) + # FIXME: description not set. This is present in schema["description"]. yield from JsonSchemaTranslator.get_fields( JsonSchemaTranslator._get_type_from_schema( schema["additionalProperties"]