Skip to content

Commit

Permalink
Add way to filter by aspect_type in metadata_sync (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Nov 4, 2024
1 parent a88018e commit 83b99a7
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
MetadataChangeLogClass,
MetadataChangeProposalClass,
)
from pydantic import BaseModel
from pydantic import BaseModel, Field

from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
Expand All @@ -21,6 +21,7 @@ class MetadataChangeEmitterConfig(BaseModel):
gms_server: Optional[str]
gms_auth_token: Optional[str]
aspects_to_exclude: Optional[List]
entity_type_to_exclude: List[str] = Field(default_factory=list)
extra_headers: Optional[Dict[str, str]]


Expand Down Expand Up @@ -59,6 +60,7 @@ def __init__(self, config: MetadataChangeEmitterConfig, ctx: PipelineContext):
if self.config.aspects_to_exclude
else self.DEFAULT_ASPECTS_EXCLUDE_SET
)

extra_headers_keys = (
list(self.config.extra_headers.keys())
if self.config.extra_headers
Expand All @@ -77,10 +79,18 @@ def act(self, event: EventEnvelope) -> None:
if event.event_type is METADATA_CHANGE_LOG_EVENT_V1_TYPE:
orig_event = cast(MetadataChangeLogClass, event.event)
logger.debug(f"received orig_event {orig_event}")
if orig_event.get("aspectName") not in self.aspects_exclude_set:
if (orig_event.get("aspectName") not in self.aspects_exclude_set) and (
orig_event.get("entityType") not in self.config.entity_type_to_exclude
if self.config.entity_type_to_exclude
else True
):
mcp = self.buildMcp(orig_event)
if mcp is not None:
self.emit(mcp)
else:
logger.debug(
f"skip emitting mcp for aspect as {orig_event.get('aspectName')} or entity type {orig_event.get('entityType')} on exclude list"
)

def buildMcp(
self, orig_event: MetadataChangeLogClass
Expand Down

0 comments on commit 83b99a7

Please sign in to comment.