Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/transformer): generate ownership aspect from handle_end_of_stream #9720

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
def _update_work_unit_id(
envelope: RecordEnvelope, urn: str, aspect_name: str
) -> Dict[Any, Any]:
structured_urn = Urn.create_from_string(urn)
simple_name = "-".join(structured_urn.get_entity_id())
structured_urn = Urn.from_string(urn)
simple_name = "-".join(structured_urn.entity_ids)
record_metadata = envelope.metadata.copy()
record_metadata.update({"workunit_id": f"txform-{simple_name}-{aspect_name}"})
return record_metadata
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
import re
from functools import lru_cache
from typing import List, Optional, cast
from typing import List, Optional, Sequence, Union, cast

from datahub.configuration.common import TransformerSemanticsConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
from datahub.metadata._schema_classes import MetadataChangeProposalClass
from datahub.metadata.schema_classes import (
GlobalTagsClass,
OwnerClass,
Expand All @@ -16,6 +19,8 @@
from datahub.utilities.urns.corpuser_urn import CorpuserUrn
from datahub.utilities.urns.tag_urn import TagUrn

logger = logging.getLogger(__name__)


class ExtractOwnersFromTagsConfig(TransformerSemanticsConfigModel):
tag_prefix: str
Expand All @@ -38,11 +43,13 @@ class ExtractOwnersFromTagsTransformer(DatasetTagsTransformer):

ctx: PipelineContext
config: ExtractOwnersFromTagsConfig
owner_mcps: List[MetadataChangeProposalWrapper]

def __init__(self, config: ExtractOwnersFromTagsConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
self.owner_mcps = []

@classmethod
def create(
Expand All @@ -56,6 +63,12 @@ def get_owner_urn(self, owner_str: str) -> str:
return owner_str + "@" + self.config.email_domain
return owner_str

def handle_end_of_stream(
self,
) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:

return self.owner_mcps

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
Expand All @@ -64,28 +77,39 @@ def transform_aspect(
return None
tags = in_tags_aspect.tags
owners: List[OwnerClass] = []

for tag_class in tags:
tag_urn = TagUrn.from_string(tag_class.tag)
tag_str = tag_urn.get_entity_id()[0]
tag_str = tag_urn.entity_ids[0]
re_match = re.search(self.config.tag_prefix, tag_str)
if re_match:
owner_str = tag_str[re_match.end() :].strip()
owner_urn_str = self.get_owner_urn(owner_str)
if self.config.is_user:
owner_urn = str(CorpuserUrn.create_from_id(owner_urn_str))
owner_urn = str(CorpuserUrn(owner_urn_str))
else:
owner_urn = str(CorpGroupUrn.create_from_id(owner_urn_str))
owner_urn = str(CorpGroupUrn(owner_urn_str))
owner_type = get_owner_type(self.config.owner_type)
if owner_type == OwnershipTypeClass.CUSTOM:
assert (
self.config.owner_type_urn is not None
), "owner_type_urn must be set if owner_type is CUSTOM"
owner = OwnerClass(
owner=owner_urn,
type=owner_type,
typeUrn=self.config.owner_type_urn,

owners.append(
OwnerClass(
owner=owner_urn,
type=owner_type,
typeUrn=self.config.owner_type_urn,
)
)
owners.append(owner)

owner_aspect = OwnershipClass(owners=owners)
return cast(Aspect, owner_aspect)
self.owner_mcps.append(
MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=OwnershipClass(
owners=owners,
),
)
)

return None
25 changes: 21 additions & 4 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,22 +648,35 @@ def _test_owner(
)
]
)

transformer = ExtractOwnersFromTagsTransformer.create(
config,
PipelineContext(run_id="test"),
)
transformed = list(

record_envelops: List[RecordEnvelope] = list(
transformer.transform(
[
RecordEnvelope(dataset, metadata={}),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we put an end of stream record in here, it should do everything right?

I don't think our tests should be calling handle_end_of_stream

RecordEnvelope(record=EndOfStream(), metadata={}),
]
)
)
owners_aspect = transformed[0].record.proposedSnapshot.aspects[0]

assert len(record_envelops) == 3

mcp: MetadataChangeProposalWrapper = record_envelops[1].record

owners_aspect = cast(OwnershipClass, mcp.aspect)

owners = owners_aspect.owners

owner = owners[0]
if expected_owner_type is not None:
assert owner.type == expected_owner_type

assert expected_owner_type is not None

assert owner.type == expected_owner_type

assert owner.owner == expected_owner

_test_owner(
Expand All @@ -672,13 +685,15 @@ def _test_owner(
"tag_prefix": "owner:",
},
expected_owner="urn:li:corpuser:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="abcdef-owner:foo",
config={
"tag_prefix": ".*owner:",
},
expected_owner="urn:li:corpuser:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
Expand All @@ -687,6 +702,7 @@ def _test_owner(
"is_user": False,
},
expected_owner="urn:li:corpGroup:foo",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
Expand All @@ -695,6 +711,7 @@ def _test_owner(
"email_domain": "example.com",
},
expected_owner="urn:li:corpuser:[email protected]",
expected_owner_type=OwnershipTypeClass.TECHNICAL_OWNER,
)
_test_owner(
tag="owner:foo",
Expand Down
Loading