-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingest schema metadata from Glue schema registry #8858
base: master
Are you sure you want to change the base?
Ingest schema metadata from Glue schema registry #8858
Conversation
…eepak-shingavi-cko/datahub into support-glue-registry-metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @deepak-shingavi-cko, I'm planning to take a look at this PR in next 1-2 days.
Can you please fix the lint failure ? This can help on how to do that - https://datahubproject.io/docs/metadata-ingestion/developing/#code-style
Also, did you get chance to test this change in your local environment with real glue and kafka instance ? It would also be great if you can write few unit tests on similar lines as test_confluent_schema_registry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs some more work in addition to lint fix and writing unit tests.
Glad to help with any queries.
By default, it follows the [Confluent's Kafka Schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html) | ||
and supports the `JSON`, `AVRO` and `PROTOBUF` schema types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what this line means here
@@ -148,6 +148,7 @@ def get_long_description(): | |||
# Deal with a version incompatibility between botocore (used by boto3) and urllib3. | |||
# See https://github.com/boto/botocore/pull/2563. | |||
"botocore!=1.23.0", | |||
"aws-glue-schema-registry==1.1.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably go against "kafka" dependencies in setup.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or can add aws_common as "kafka" plugin dependency given that boto is also required.
|
||
class GlueSchemaRegistry(KafkaSchemaRegistryBase): | ||
""" | ||
This is confluent schema registry specific implementation of datahub.ingestion.source.kafka import SchemaRegistry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confluent schema registry specific implementation of datahub.ingestion.source.kafka import SchemaRegistry | |
This is glue schema registry specific implementation of datahub.ingestion.source.kafka_schema_registry_base import KafkaSchemaRegistryBase |
self.schema_registry_client = SchemaRegistryClient(self.glue_client, registry_name=source_config.registry_name) | ||
|
||
self.known_schema_registry_subjects: List[str] = [] | ||
logger.warning(f"Get subjects from schema registry will be empty as we are using Glue.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.warning(f"Get subjects from schema registry will be empty as we are using Glue.") | |
logger.warning("Get subjects from schema registry will be empty as we are using Glue.") |
) | ||
aws_account_region: str = pydantic.Field( | ||
default={}, | ||
description="The name of the Glue Schema Registry from where the schema metadata will be ingested.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description needs to be updated here.
Can we add a class GlueSchemaRegistryOptions
and define these fields inside the class.
such that kafka config has a field:
glue_schema_registry_options: Optional[GlueSchemaRegistryOptions] # Required if using Glue Schema Registry
schema_type_str: str = "key" if is_key_schema else "value" | ||
topic_subject: Optional[str] = 'key' if is_key_schema else "value" | ||
|
||
print(f"topic_subject=={topic_subject is not None}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to debug log
if ref_subject in schema_seen: | ||
continue | ||
reference_schema: RegisteredSchema = ( | ||
self.schema_registry_client.get_latest_version(ref_subject) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the different between glue_client.get_schema_version
vs schema_registry_client.get_latest_version
and glue_client.get_latest_version
. Can we use same method at all places ?
RegisteredSchema, | ||
Schema, | ||
SchemaReference | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear why we would need confluent_kafka imports here.
The idea to support additional schema registry is to provide implementation for following method for new schema registry
def get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
Assuming that glue client and aws schema registry client has all methods, classes for interfacing with schema, these imports should not be necessary.
registered_schema = self.glue_client.get_schema_version( | ||
SchemaId={ | ||
'SchemaName': topic + '-' + topic_subject, | ||
'RegistryName': registry_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'RegistryName': registry_name | |
'RegistryName': self.config.registry_name |
Datahub currently does not support Metadata ingestion from Glue Schema registry.
Datahub users can configure ingestions to ingest Kafka topic's schema definition with documentation by sample ingestion file.
Checklist