Skip to content

Commit

Permalink
fix(ingest/powerbi): reorganize dataset container emit order
Browse files Browse the repository at this point in the history
  • Loading branch information
looppi committed Nov 7, 2023
1 parent 9b884a1 commit 8f9b395
Show file tree
Hide file tree
Showing 3 changed files with 418 additions and 288 deletions.
20 changes: 11 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ def to_datahub_dataset(
f"Mapping dataset={dataset.name}(id={dataset.id}) to datahub dataset"
)

if self.__config.extract_datasets_to_containers:
dataset_mcps.extend(self.generate_container_for_dataset(dataset))

for table in dataset.tables:
# Create a URN for dataset
ds_urn = builder.make_dataset_urn_with_platform_instance(
Expand Down Expand Up @@ -747,15 +750,21 @@ def generate_container_for_workspace(

def generate_container_for_dataset(
self, dataset: powerbi_data_classes.PowerBIDataset
) -> Iterable[MetadataWorkUnit]:
) -> Iterable[MetadataChangeProposalWrapper]:
dataset_key = dataset.get_dataset_key(self.__config.platform_name)
container_work_units = gen_containers(
container_key=dataset_key,
name=dataset.name if dataset.name else dataset.id,
parent_container_key=self.workspace_key,
sub_types=[BIContainerSubTypes.POWERBI_DATASET],
)
return container_work_units

# The if statement here is just to satisfy mypy
return [
wu.metadata
for wu in container_work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
]

def append_tag_mcp(
self,
Expand Down Expand Up @@ -1211,10 +1220,6 @@ def validate_dataset_type_mapping(self):
f"Dataset lineage would get ingested for data-platform = {self.source_config.dataset_type_mapping}"
)

def extract_datasets_as_containers(self):
for dataset in self.mapper.processed_datasets:
yield from self.mapper.generate_container_for_dataset(dataset)

def extract_independent_datasets(
self, workspace: powerbi_data_classes.Workspace
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -1263,9 +1268,6 @@ def get_workspace_workunit(
):
yield work_unit

if self.source_config.extract_datasets_to_containers:
yield from self.extract_datasets_as_containers()

yield from self.extract_independent_datasets(workspace)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
Expand Down
Loading

0 comments on commit 8f9b395

Please sign in to comment.