Skip to content

Commit

Permalink
Add metadata items to tables and databases (CaDeT) (#1040)
Browse files Browse the repository at this point in the history
* Add metadata items to tables and databases

* Add test for parse_updated

* Amend tests for metadata field changes

* Removed test entites

* Remove Provider, add Enum for Audience

* Add last modified to summary card as time since update

* Use strings for Enum types in CustomEntityProperties

* Update lib/datahub-client/data_platform_catalogue/client/graphql_helpers.py

* Capitalise refresh schedule
  • Loading branch information
murdo-moj authored Nov 15, 2024
1 parent a614b3f commit 9ba56ef
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 53 deletions.
9 changes: 9 additions & 0 deletions home/templatetags/format_timesince.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from home.templatetags.markdown import register


@register.filter
def format_timesince(timesince: str) -> str:
"""
Timesince returns a string like "3 days, 4 hours". This filter will return a string like "3 days".
"""
return timesince.split(",")[0]
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@
parse_data_owner,
parse_domain,
parse_glossary_terms,
parse_last_modified,
parse_names,
parse_properties,
parse_relations,
parse_stewards,
parse_subtypes,
parse_tags,
parse_updated,
)
from data_platform_catalogue.client.search import SearchClient
from data_platform_catalogue.entities import (
Expand Down Expand Up @@ -274,6 +276,8 @@ def get_table_details(self, urn) -> Table:
tags = parse_tags(response)
glossary_terms = parse_glossary_terms(response)
created, modified = parse_created_and_modified(properties)
modified = parse_last_modified(response)
updated = parse_updated(response)
name, display_name, qualified_name = parse_names(response, properties)

lineage_relations = parse_relations(
Expand Down Expand Up @@ -308,6 +312,7 @@ def get_table_details(self, urn) -> Table:
tags=tags,
glossary_terms=glossary_terms,
last_modified=modified,
last_updated=updated,
created=created,
column_details=columns,
custom_properties=custom_properties,
Expand Down Expand Up @@ -369,6 +374,7 @@ def get_database_details(self, urn: str) -> Database:
tags = parse_tags(response)
glossary_terms = parse_glossary_terms(response)
created, modified = parse_created_and_modified(properties)
modified = parse_last_modified(response)
name, display_name, qualified_name = parse_names(response, properties)

child_relations = parse_relations(
Expand Down Expand Up @@ -412,6 +418,7 @@ def get_dashboard_details(self, urn: str) -> Dashboard:
tags = parse_tags(response)
glossary_terms = parse_glossary_terms(response)
created, modified = parse_created_and_modified(properties)
modified = parse_last_modified(response)
name, display_name, qualified_name = parse_names(response, properties)
children = parse_relations(
RelationshipType.CHILD, [response["relationships"]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ query getDatasetDetails($urn: String!) {
}
}
}
runs: runs(start: 0, count: 1, direction: OUTGOING) {
runs {
... on DataProcessInstance {
created {
time
}
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from collections import defaultdict
from datetime import datetime, timezone
from datetime import datetime
from importlib.resources import files
import logging
from typing import Any, Tuple

from data_platform_catalogue.entities import (
Audience,
AccessInformation,
Column,
ColumnRef,
Expand All @@ -20,6 +22,8 @@
UsageRestrictions,
)

logger = logging.getLogger(__name__)

PROPERTIES_EMPTY_STRING_FIELDS = ("description", "externalUrl")

# Note: Data owner is missing as an ownershipType entity in Datahub, but it still seems to be
Expand Down Expand Up @@ -119,7 +123,7 @@ def parse_last_modified(entity: dict[str, Any]) -> datetime | None:
timestamp = entity.get("lastIngested")
if timestamp is None:
return None
return datetime.fromtimestamp(timestamp / 1000, timezone.utc)
return timestamp


def parse_created_and_modified(
Expand All @@ -132,14 +136,21 @@ def parse_created_and_modified(
if modified == 0:
modified = None

if created is not None:
created = datetime.fromtimestamp(created / 1000, timezone.utc)
if modified is not None:
modified = datetime.fromtimestamp(modified / 1000, timezone.utc)

return created, modified


def parse_updated(
response: dict[str, Any]
) -> datetime | None:
list_of_runs: list = response.get("runs", {}).get("runs", [])
if not list_of_runs:
updated = None
if list_of_runs:
updated = list_of_runs[0].get("created", {}).get("time", {})

return updated


def parse_tags(entity: dict[str, Any]) -> list[TagRef]:
"""
Parse tag information into a list of TagRef objects for displaying
Expand All @@ -166,6 +177,28 @@ def parse_tags(entity: dict[str, Any]) -> list[TagRef]:
return tags


def get_refresh_period_from_cadet_tags(
tags: list[TagRef],
refresh_schedules: list[str] = ["daily", "weekly", "monthly"]
) -> str:
# Check if any of the tags are refresh period tags eg "daily_opg"
relevant_refresh_schedules = [
schedule
for tag_ref in tags
for schedule in refresh_schedules
if schedule in tag_ref.display_name
]
if len(relevant_refresh_schedules) > 1:
logger.warn(f"More than one refresh period tag found: {tags=}")

if relevant_refresh_schedules:
refresh_schedule = relevant_refresh_schedules[0].capitalize()
return refresh_schedule

if not relevant_refresh_schedules:
return ""


def parse_glossary_terms(entity: dict[str, Any]) -> list[GlossaryTermRef]:
"""
Parse glossary_term information into a list of TagRef for displaying
Expand Down Expand Up @@ -215,21 +248,18 @@ def parse_properties(
access_information = AccessInformation.model_validate(custom_properties_dict)
usage_restrictions = UsageRestrictions.model_validate(custom_properties_dict)
data_summary = DataSummary.model_validate(custom_properties_dict)
tags = parse_tags(entity)
data_summary.refresh_period = get_refresh_period_from_cadet_tags(tags)
audience = custom_properties_dict.get("audience", "Internal")

further_information = FurtherInformation.model_validate(custom_properties_dict)

last_updated_timestamp = properties.get("lastRefreshed")
if last_updated_timestamp:
last_updated_date_str = datetime.fromtimestamp(last_updated_timestamp).strftime(
"%d %B %Y"
)
data_summary.last_updated = last_updated_date_str

custom_properties = CustomEntityProperties(
access_information=access_information,
usage_restrictions=usage_restrictions,
data_summary=data_summary,
further_information=further_information,
audience=audience
)

return properties, custom_properties
Expand Down
24 changes: 17 additions & 7 deletions lib/datahub-client/data_platform_catalogue/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class RelationshipType(Enum):
CHILD = "CHILD"


class Audience(Enum):
INTERNAL = "Internal"
PUBLISHED = "Published"


class EntityRef(BaseModel):
"""
A reference to another entity in the metadata graph.
Expand Down Expand Up @@ -323,19 +328,12 @@ class DataSummary(BaseModel):
default="",
examples=["123", 123],
)

refresh_period: str = Field(
description="Indicates the frequency that the data are refreshed/updated",
default="",
examples=["Annually", "Quarterly", "Monthly", "Weekly", "Daily"],
)

last_updated: str = Field(
description="Indicates the date when the data were last refreshed/updated",
default="",
examples=["05 May 2024", "25 December 2023"],
)


class CustomEntityProperties(BaseModel):
"""Custom entity properties not part of DataHub's entity model"""
Expand All @@ -355,6 +353,13 @@ class CustomEntityProperties(BaseModel):
description="Routes to further information about the data",
default_factory=FurtherInformation,
)
audience: Audience = Field(
description="If the data is published or not",
default="Internal",
)

class Config:
use_enum_values = True


class Entity(BaseModel):
Expand Down Expand Up @@ -520,6 +525,11 @@ class Table(Entity):
]
],
)
last_updated: Optional[datetime] = Field(
description="Indicates the time when the data were last refreshed (eg pipeline run with dbt).",
default=None,
examples=[datetime(2011, 10, 2, 3, 0, 0)],
)


class Chart(Entity):
Expand Down
17 changes: 11 additions & 6 deletions lib/datahub-client/tests/client/datahub/test_datahub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ReferencedEntityMissing,
)
from data_platform_catalogue.entities import (
Audience,
AccessInformation,
Chart,
Column,
Expand Down Expand Up @@ -80,8 +81,8 @@ def database(self):
)
]
},
last_modified=datetime(2020, 5, 17),
created=datetime(2020, 5, 17),
last_modified=1710426920000,
created=1710426920000,
tags=[TagRef(urn="test", display_name="test")],
platform=EntityRef(urn="urn:li:dataPlatform:athena", display_name="athena"),
custom_properties=CustomEntityProperties(
Expand Down Expand Up @@ -137,7 +138,7 @@ def table(self):
],
),
tags=[TagRef(display_name="some-tag", urn="urn:li:tag:Entity")],
last_modified=datetime(2024, 3, 5, 6, 16, 47, 814000, tzinfo=timezone.utc),
last_modified=1710426920000,
created=None,
column_details=[
Column(
Expand Down Expand Up @@ -205,7 +206,7 @@ def table2(self):
],
),
tags=[TagRef(display_name="some-tag", urn="urn:li:tag:Entity")],
last_modified=datetime(2024, 3, 5, 6, 16, 47, 814000, tzinfo=timezone.utc),
last_modified=1710426920000,
created=None,
column_details=[
Column(
Expand Down Expand Up @@ -331,6 +332,7 @@ def test_get_dataset(
},
"lastIngested": 1709619407814,
"domain": None,
"provider": "LAA",
"schemaMetadata": {
"fields": [
{
Expand Down Expand Up @@ -374,6 +376,7 @@ def test_get_dataset(
fully_qualified_name="Foo.Dataset",
description="Dataset",
relationships={
RelationshipType.DATA_LINEAGE: [],
RelationshipType.PARENT: [
EntitySummary(
entity_ref=EntityRef(
Expand All @@ -389,15 +392,15 @@ def test_get_dataset(
entity_type="Database",
)
],
RelationshipType.DATA_LINEAGE: [],
},
domain=DomainRef(display_name="", urn=""),
governance=Governance(
data_owner=OwnerRef(display_name="", email="", urn=""),
data_stewards=[],
),
tags=[TagRef(display_name="some-tag", urn="urn:li:tag:Entity")],
last_modified=datetime(2024, 3, 5, 6, 16, 47, 814000, tzinfo=timezone.utc),
last_modified=1709619407814,
provider="LAA",
created=None,
platform=EntityRef(urn="datahub", display_name="datahub"),
column_details=[
Expand Down Expand Up @@ -479,6 +482,7 @@ def test_get_dataset_minimal_properties(
),
data_summary=DataSummary(),
further_information=FurtherInformation(),
audience=Audience.INTERNAL,
),
column_details=[],
)
Expand Down Expand Up @@ -538,6 +542,7 @@ def test_get_chart_details(self, datahub_client, base_mock_graph):
),
data_summary=DataSummary(),
further_information=FurtherInformation(),
audience=Audience.INTERNAL,
),
external_url="https://data.justice.gov.uk/prisons/public-protection/absconds",
)
Expand Down
Loading

0 comments on commit 9ba56ef

Please sign in to comment.