-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/kafka): support metadata mapping from kafka avro schemas #8825
feat(ingest/kafka): support metadata mapping from kafka avro schemas #8825
Conversation
Co-authored-by: Daniel Messias <[email protected]> Co-authored-by: Deepankarkr <[email protected]>
@mayurinehate fyi there's a test failing on this one I'll try to review this in the next two days |
I've fixed the test. Thank you for informing. |
self.field_meta_processor = OperationProcessor( | ||
self.source_config.field_meta_mapping, | ||
self.source_config.tag_prefix, | ||
"SOURCE_CONTROL", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what this should be. "SOURCE_CONTROL" is probably okay for dbt. Should we set the ownership source to "OTHER" or "MANUAL"here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use OwnershipSourceTypeClass.SERVICE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
) | ||
tag_prefix: str = pydantic.Field( | ||
default="", description="Prefix added to tags during ingestion." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default is dbt:
for dbt. Do we need to set default to kafka:
here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
honestly not sure why we added tag prefix at all
if we did want to support it, it should be implemented as a workunit helper right?
so anyways, empty is fine for now, with a goal of removing that field with something cleaner in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure either. Do you mean workunit processor ? Yes that would be better.
Also, we can remove prefix stuff from kafka it its not required.
): | ||
self._schema = schema | ||
self._actual_schema = actual_schema | ||
self._converter = converter | ||
self._description = description | ||
self._default_value = default_value | ||
self._meta_mapping_processor = meta_mapping_processor | ||
self._schema_tags_field = schema_tags_field | ||
self._tag_prefix = tag_prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can get these using converter._something
right? is there a reason to pass them explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense. there is no reason to pass them explicitly. Removed it.
self.field_meta_processor = OperationProcessor( | ||
self.source_config.field_meta_mapping, | ||
self.source_config.tag_prefix, | ||
"SOURCE_CONTROL", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use OwnershipSourceTypeClass.SERVICE
) | ||
tag_prefix: str = pydantic.Field( | ||
default="", description="Prefix added to tags during ingestion." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
honestly not sure why we added tag prefix at all
if we did want to support it, it should be implemented as a workunit helper right?
so anyways, empty is fine for now, with a goal of removing that field with something cleaner in the future
try: | ||
raw_props_value = reduce( | ||
operator.getitem, operation_key.split("."), raw_props | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a comment would be helpful here
CI failure is unrelated |
Adaptation of this PR: #7381 with slightly different implementation and more tests. Please refer the original PR for detailed description of feature.
In short, this PR allows dbt-style meta-mapping for dataset and field level additional properties present in avro schema of kafka topic. This also allows translating tags from existing tags listed in avro schema to DataHub tags.
Checklist