Skip to content

Commit

Permalink
feat/fmd-206 add ingestion timing (#213)
Browse files Browse the repository at this point in the history
* fix: Notify on failure GHA steps referencing incorrect `input` var for environment

* fix: downversion datahub cli to `0.13.2.4`

* chore: type checker linting (fixes pylance type checking errors)

* chore: use non-depricated suggested methods from datahub utils

* perf: time ingestion sources and transformers

- add calls to `time` unix command to `datahub ingest` calls
- add decorators for timing function/iterator runs
- use the timers for
  - create_cadet_databases_source
  - justice_data_source
  - `AssignCadetDatabases transformer

Co-authored-by: Mat Moore <[email protected]>
  • Loading branch information
tom-webber and Mat Moore authored Aug 8, 2024
1 parent 3591714 commit ffcbfe9
Show file tree
Hide file tree
Showing 13 changed files with 369 additions and 321 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ingest-cadet-metadata.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ jobs:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/create_cadet_databases.yaml
run: time poetry run datahub ingest -c ingestion/create_cadet_databases.yaml

- name: push metadata to datahub
env:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/cadet.yaml
run: time poetry run datahub ingest -c ingestion/cadet.yaml

- name: Notify on failure
uses: slackapi/[email protected]
Expand All @@ -87,7 +87,7 @@ jobs:
channel-id: "C071VNHPUHZ"
payload: |
{
"text": ":warning: Unable to ingest CaDeT metadata on ${{inputs.env}}!",
"text": ":warning: Unable to ingest CaDeT metadata on ${{inputs.ENVIRONMENT}}!",
"blocks": [
{
"type": "section",
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ingest-justice-data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
DATAHUB_GMS_TOKEN: ${{ secrets.DATAHUB_GMS_TOKEN }}
DATAHUB_GMS_URL: ${{ vars.DATAHUB_GMS_URL }}
DATAHUB_TELEMETRY_ENABLED: false
run: poetry run datahub ingest -c ingestion/justice_data_ingest.yaml
run: time poetry run datahub ingest -c ingestion/justice_data_ingest.yaml

- name: Notify on failure
uses: slackapi/[email protected]
Expand All @@ -72,7 +72,7 @@ jobs:
channel-id: "C071VNHPUHZ"
payload: |
{
"text": ":warning: Unable to ingest Justice Data metadata on ${{inputs.env}}!",
"text": ":warning: Unable to ingest Justice Data metadata on ${{inputs.ENVIRONMENT}}!",
"blocks": [
{
"type": "section",
Expand Down
8 changes: 7 additions & 1 deletion ingestion/create_cadet_databases_source/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
get_cadet_manifest,
validate_fqn,
)
from ingestion.utils import report_generator_time, report_time

logging.basicConfig(level=logging.DEBUG)


@config_class(CreateCadetDatabasesConfig)
class CreateCadetDatabases(Source):
source_config: CreateCadetDatabasesConfig
report: SourceReport = SourceReport()

@report_time
def __init__(self, config: CreateCadetDatabasesConfig, ctx: PipelineContext):
super().__init__(ctx)
self.source_config = config
Expand All @@ -35,6 +39,7 @@ def create(cls, config_dict, ctx):
config = CreateCadetDatabasesConfig.parse_obj(config_dict)
return cls(config, ctx)

@report_generator_time
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
manifest = get_cadet_manifest(self.source_config.manifest_s3_uri)

Expand All @@ -51,7 +56,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
databases_with_domains, display_tags = (
self._get_databases_with_domains_and_display_tags(manifest)
)
sub_types = [DatasetContainerSubTypes.DATABASE]
sub_types: list[str] = [DatasetContainerSubTypes.DATABASE]
last_modified = int(datetime.now().timestamp())
for database, domain in databases_with_domains:
database_container_key = mcp_builder.DatabaseKey(
Expand Down Expand Up @@ -89,6 +94,7 @@ def _get_domains(self, manifest) -> set[str]:
if manifest["nodes"][node]["resource_type"] == "model"
)

@report_time
def _get_databases_with_domains_and_display_tags(
self, manifest
) -> tuple[set[tuple[str, str]], dict]:
Expand Down
6 changes: 5 additions & 1 deletion ingestion/ingestion_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

from ingestion.config import ENV, INSTANCE, PLATFORM
from ingestion.utils import report_time

logging.basicConfig(level=logging.DEBUG)


@report_time
def get_cadet_manifest(manifest_s3_uri: str) -> Dict:
try:
s3 = boto3.client("s3")
Expand Down Expand Up @@ -49,7 +53,7 @@ def validate_fqn(fqn: list[str]) -> bool:
f"{table_name=} has multiple double underscores which will confuse parsing"
)

match = re.match(r"\w+__\w+", table_name)
match: re.Match[str] | None = re.match(r"\w+__\w+", table_name)
if match:
return True
if not match:
Expand Down
6 changes: 5 additions & 1 deletion ingestion/justice_data_source/source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from io import BufferedReader
from typing import Iterable, Optional

Expand Down Expand Up @@ -25,7 +26,6 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsV2Class,
ChangeTypeClass,
ChartInfoClass,
CorpGroupInfoClass,
Expand All @@ -38,10 +38,13 @@
)

from ingestion.ingestion_utils import list_datahub_domains
from ingestion.utils import report_generator_time

from .api_client import JusticeDataAPIClient
from .config import JusticeDataAPIConfig

logging.basicConfig(level=logging.DEBUG)


@platform_name("File")
@config_class(JusticeDataAPIConfig)
Expand Down Expand Up @@ -72,6 +75,7 @@ def create(cls, config_dict, ctx):
config = JusticeDataAPIConfig.parse_obj(config_dict)
return cls(ctx, config)

@report_generator_time
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
all_chart_data = self.client.list_all(self.config.exclude_id_list)

Expand Down
2 changes: 2 additions & 0 deletions ingestion/taggers/display_in_catalogue_tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import TagAssociationClass

logging.basicConfig(level=logging.DEBUG)


def add_display_in_catalogue_tag(entity_urn: str) -> List[TagAssociationClass]:
"""
Expand Down
5 changes: 5 additions & 0 deletions ingestion/transformers/assign_cadet_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
parse_database_and_table_names,
validate_fqn,
)
from ingestion.utils import report_time

logging.basicConfig(level=logging.DEBUG)


class AssignCadetDatabasesConfig(ConfigModel):
Expand Down Expand Up @@ -49,6 +52,7 @@ def transform_aspect(
) -> Optional[Aspect]:
return None

@report_time
def handle_end_of_stream(
self,
) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
Expand Down Expand Up @@ -77,6 +81,7 @@ def handle_end_of_stream(

return mcps

@report_time
def _get_table_database_mappings(self, manifest) -> Dict[str, str]:
mappings = {}
for node in manifest["nodes"]:
Expand Down
47 changes: 29 additions & 18 deletions ingestion/transformers/assign_cadet_domains.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
from typing import Callable, Union
from typing import Iterable

from datahub.configuration.common import (
KeyValuePattern,
TransformerSemanticsConfigModel,
)
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.transformer.dataset_domain import (
AddDatasetDomain,
AddDatasetDomainSemanticsConfig,
)
from datahub.metadata.schema_classes import DomainsClass

from ingestion.ingestion_utils import (
convert_cadet_manifest_table_to_datahub,
get_cadet_manifest,
validate_fqn,
)


class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
get_domains_to_add: Union[
Callable[[str], DomainsClass],
Callable[[str], DomainsClass],
]

_resolve_domain_fn = pydantic_resolve_key("get_domains_to_add")
from ingestion.utils import Stopwatch, report_time


class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
Expand All @@ -39,26 +33,42 @@ class AssignCadetDomains(AddDatasetDomain):
def __init__(self, config: CadetDatasetDomainSemanticsConfig, ctx: PipelineContext):
AddDatasetDomain.raise_ctx_configuration_error(ctx)
manifest = get_cadet_manifest(config.manifest_s3_uri)
domain_mappings = self._get_domain_mapping(manifest)
domain_pattern = domain_mappings.domain_pattern
domain_mappings: PatternDatasetDomainSemanticsConfig = self._get_domain_mapping(
manifest
)
domain_pattern: KeyValuePattern = domain_mappings.domain_pattern

def resolve_domain(domain_urn: str) -> DomainsClass:
domains = domain_pattern.value(domain_urn)
return self.get_domain_class(ctx.graph, domains)

generic_config = AddDatasetDomainSemanticsConfig(
get_domains_to_add=resolve_domain,
semantics=config.semantics,
replace_existing=config.replace_existing,
get_domains_to_add=resolve_domain,
)

self.transform_timer = Stopwatch(transformer="AssignCadetDomains")

super().__init__(generic_config, ctx)

def _should_process(self, record):
if not self.transform_timer.running:
self.transform_timer.start()
return super()._should_process(record)

def _handle_end_of_stream(
self, envelope: RecordEnvelope
) -> Iterable[RecordEnvelope]:
self.transform_timer.stop()
self.transform_timer.report()
return super()._handle_end_of_stream(envelope)

@classmethod
def create(cls, config_dict, ctx: PipelineContext) -> "AssignCadetDomains":
try:
manifest_s3_uri = config_dict.get("manifest_s3_uri")
replace_existing = config_dict.get("replace_existing", False)
manifest_s3_uri: str = config_dict.get("manifest_s3_uri", "")
replace_existing: bool = config_dict.get("replace_existing", False)
except Exception as e:
print(e)
raise
Expand All @@ -69,6 +79,7 @@ def create(cls, config_dict, ctx: PipelineContext) -> "AssignCadetDomains":
)
return cls(config_dict, ctx)

@report_time
def _get_domain_mapping(self, manifest) -> PatternDatasetDomainSemanticsConfig:
"""Map regex patterns for tables to domains"""
nodes = manifest.get("nodes")
Expand Down
88 changes: 88 additions & 0 deletions ingestion/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging
import time
from datetime import timedelta

logging.basicConfig(level=logging.DEBUG)


def report_time(func):
"""
Decorator to report the total time of a function call
"""

def wrapped_func(*args, **kwargs):
arg_types = [type(arg) for arg in args]
stopwatch = Stopwatch(
function=func.__name__, arg_types=arg_types, kwargs=kwargs
)

stopwatch.start()

r = func(*args, **kwargs)

stopwatch.stop()
stopwatch.report()

return r

return wrapped_func


def report_generator_time(func):
"""
Decorator to report the total time of an iterable
"""

def wrapped_func(*args, **kwargs):
arg_types = [type(arg) for arg in args]
stopwatch = Stopwatch(
function=func.__name__, arg_types=arg_types, kwargs=kwargs
)

stopwatch.start()

r = func(*args, **kwargs)
yield from r

stopwatch.stop()
stopwatch.report()

return r

return wrapped_func


class Stopwatch:
"""
Wrapper around the time module for timing code execution
"""

def __init__(self, **meta):
self.running = False
self.start_time = None
self.stop_time = None
self.elapsed = 0
joined_meta = ", ".join(f"{k}={v}" for k, v in meta.items())
self.prefix = f"TIMING: {joined_meta}, " if joined_meta else "TIMING: "

def start(self):
self.start_time = time.time()
self.running = True

def stop(self):
self.running = False
if not self.start_time:
return

now = time.time()
elapsed = now - self.start_time
self.stop_time = now
self.elapsed += elapsed

def report(self):
logging.info(
f"{self.prefix}"
f"start_time={time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(self.start_time))}, "
f"end_time={time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(self.stop_time))}, "
f"elapsed_time={str(timedelta(seconds=self.elapsed))}"
)
Loading

0 comments on commit ffcbfe9

Please sign in to comment.