Skip to content

Commit

Permalink
Optimise domain assignment (#259)
Browse files Browse the repository at this point in the history
* swap domain assignment from transformer to source that creates domains

* Cleaned up comments

* Removed tests for deleted transformer

* Added to test workflow triggers

* Run python tests on changes to python files

* Python tests now run on all pull requests

* Added test for dataset domain assignment events

---------

Co-authored-by: LavMatt <[email protected]>
  • Loading branch information
murdo-moj and LavMatt authored Sep 5, 2024
1 parent 49d2c3f commit d7f1b23
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 227 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,9 @@ name: Run Python Tests
on:
workflow_dispatch:
pull_request:
paths:
- "scripts/**.py"
- "tests/**.py"
push:
branches:
- main
paths:
- "scripts/**.py"
- "tests/**.py"

jobs:
python-unit-tests:
Expand Down
4 changes: 0 additions & 4 deletions ingestion/cadet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ source:
owner_category: DATAOWNER

transformers:
- type: "ingestion.transformers.assign_cadet_domains.AssignCadetDomains"
config:
manifest_s3_uri: "s3://mojap-derived-tables/prod/run_artefacts/latest/target/manifest.json"
replace_existing: true
- type: "ingestion.transformers.assign_cadet_databases.AssignCadetDatabases"
config:
manifest_s3_uri: "s3://mojap-derived-tables/prod/run_artefacts/latest/target/manifest.json"
43 changes: 36 additions & 7 deletions ingestion/create_cadet_databases_source/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes
from datahub.metadata.schema_classes import ChangeTypeClass, DomainPropertiesClass
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DomainPropertiesClass,
DomainsClass,
)

from ingestion.config import ENV, INSTANCE, PLATFORM
from ingestion.create_cadet_databases_source.config import CreateCadetDatabasesConfig
from ingestion.ingestion_utils import (
format_domain_name,
get_cadet_manifest,
validate_fqn,
parse_database_and_table_names,
)
from ingestion.utils import report_generator_time, report_time

Expand Down Expand Up @@ -53,7 +58,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield wu

# Create database entities and assign them to their domains
databases_with_domains, display_tags = (
databases_with_domains, tables_with_domains, display_tags = (
self._get_databases_with_domains_and_display_tags(manifest)
)
sub_types: list[str] = [DatasetContainerSubTypes.DATABASE]
Expand Down Expand Up @@ -86,6 +91,26 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
extra_properties=None,
)

for database, table, domain in tables_with_domains:
dataset_urn = mce_builder.make_dataset_urn_with_platform_instance(
platform=PLATFORM,
name=f"{database}.{table}",
platform_instance=INSTANCE,
)
domain_name = format_domain_name(domain)
domain_urn = mce_builder.make_domain_urn(domain=domain_name)
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspect=DomainsClass(domains=[domain_urn]),
)

wu = MetadataWorkUnit("single_mcp", mcp=mcp)
self.report.report_workunit(wu)
logging.info(f"Assigning {domain_name} domain to {database}.{table}")
yield wu

def _get_domains(self, manifest) -> set[str]:
"""Only models are arranged by domain in CaDeT"""
return set(
Expand All @@ -97,7 +122,7 @@ def _get_domains(self, manifest) -> set[str]:
@report_time
def _get_databases_with_domains_and_display_tags(
self, manifest
) -> tuple[set[tuple[str, str]], dict]:
) -> tuple[set[tuple[str, str]], set[tuple[str, str, str]], dict]:
"""
These mappings will only work with tables named {database}__{table}
like create a derived table.
Expand All @@ -106,24 +131,28 @@ def _get_databases_with_domains_and_display_tags(
display tags, where key is database and value is dc_display_in_catalogue
if any model is to be displayed
"""
mappings = set()
database_mappings = set()
table_mappings = set()
tags = {}
for node in manifest["nodes"]:
if manifest["nodes"][node]["resource_type"] == "model":
fqn = manifest["nodes"][node]["fqn"]
if validate_fqn(fqn):
database = manifest["nodes"][node]["schema"]
database, table = parse_database_and_table_names(
manifest["nodes"][node]
)
domain = fqn[1]
tag = (
"dc_display_in_catalogue"
if "dc_display_in_catalogue" in manifest["nodes"][node]["tags"]
else None
)
mappings.add((database, domain))
database_mappings.add((database, domain))
table_mappings.add((database, table, domain))
if tag is not None:
tags[database] = [tag]

return mappings, tags
return database_mappings, table_mappings, tags

def _make_domain(self, domain_name) -> MetadataChangeProposalWrapper:
domain_urn = mce_builder.make_domain_urn(domain=domain_name)
Expand Down
100 changes: 0 additions & 100 deletions ingestion/transformers/assign_cadet_domains.py

This file was deleted.

83 changes: 0 additions & 83 deletions tests/test_assign_cadet_domains.py

This file was deleted.

61 changes: 34 additions & 27 deletions tests/test_create_cadet_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,40 @@
from ingestion.ingestion_utils import format_domain_name


def test_creating_domains_from_s3():
source = CreateCadetDatabases(
ctx=PipelineContext(run_id="domain-source-test"),
config=CreateCadetDatabasesConfig(
manifest_s3_uri="s3://mojap-derived-tables/prod/run_artefacts/latest/target/manifest.json"
),
)
class TestCreateCadetDatabases:
def setup_method(self):
source = CreateCadetDatabases(
ctx=PipelineContext(run_id="domain-source-test"),
config=CreateCadetDatabasesConfig(
manifest_s3_uri="s3://mojap-derived-tables/prod/run_artefacts/latest/target/manifest.json"
),
)
self.results = list(source.get_workunits())

results = list(source.get_workunits())
def test_creating_domains_from_s3(self):
domain_creation_events = self.results[:4]
domains = [event.metadata.aspect.name for event in domain_creation_events]
domains.sort()
assert domains == ["Courts", "HQ", "Prison", "Probation"]

domain_creation_events = results[:4]
domains = [event.metadata.aspect.name for event in domain_creation_events]
domains.sort()
assert domains == ["Courts", "HQ", "Prison", "Probation"]
# 6 events are created per database, we'll just test one
# (create container, update status, add platform, add subtype, associate container with domain)
assert self.results[4].metadata.aspect.customProperties.get("database")
assert self.results[6].metadata.aspect.platform == builder.make_data_platform_urn(
platform="dbt"
)
assert DatasetContainerSubTypes.DATABASE in self.results[7].metadata.aspect.typeNames
assert self.results[8].metadata.entityUrn == self.results[4].metadata.entityUrn
domain_result_4 = (
self.results[4].metadata.aspect.customProperties.get("database").split("_")[0]
)
assert (
builder.make_domain_urn(format_domain_name(domain_result_4))
in self.results[8].metadata.aspect.domains
)

# 6 events are created per database, we'll just test one
# (create container, update status, add platform, add subtype, associate container with domain)
assert results[4].metadata.aspect.customProperties.get("database")
assert results[6].metadata.aspect.platform == builder.make_data_platform_urn(
platform="dbt"
)
assert DatasetContainerSubTypes.DATABASE in results[7].metadata.aspect.typeNames
assert results[8].metadata.entityUrn == results[4].metadata.entityUrn
domain_result_4 = (
results[4].metadata.aspect.customProperties.get("database").split("_")[0]
)
assert (
builder.make_domain_urn(format_domain_name(domain_result_4))
in results[8].metadata.aspect.domains
)
def test_datasets_are_assigned_to_domains(self):
# This is the first event which should associate a dataset with a database
assert self.results[28].metadata.entityType == "dataset"
assert self.results[28].metadata.changeType == "UPSERT"
assert self.results[28].metadata.aspect.domains

0 comments on commit d7f1b23

Please sign in to comment.