Skip to content

Commit

Permalink
feat(ingest): add YamlFileUpdater utility (datahub-project#8266)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jun 29, 2023
1 parent a679e66 commit 08d4e90
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 5 deletions.
17 changes: 12 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

logger = logging.getLogger(__name__)

SearchFilterRule = Dict[str, Any]


class DatahubClientConfig(ConfigModel):
"""Configuration class for holding connectivity to datahub gms"""
Expand Down Expand Up @@ -538,8 +540,9 @@ def get_urns_by_filter(
query: Optional[str] = None,
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 10000,
extraFilters: Optional[List[SearchFilterRule]] = None,
) -> Iterable[str]:
"""Fetch all urns that match the given filters.
"""Fetch all urns that match all of the given filters.
Filters are combined conjunctively. If multiple filters are specified, the results will match all of them.
Note that specifying a platform filter will automatically exclude all entity types that do not have a platform.
Expand All @@ -549,6 +552,7 @@ def get_urns_by_filter(
:param platform: Platform to filter on. If None, all platforms will be returned.
:param env: Environment (e.g. PROD, DEV) to filter on. If None, all environments will be returned.
:param status: Filter on the deletion status of the entity. The default is only return non-soft-deleted entities.
:param extraFilters: Additional filters to apply. If specified, the results will match all of the filters.
"""

types: Optional[List[str]] = None
Expand All @@ -561,8 +565,7 @@ def get_urns_by_filter(
# Add the query default of * if no query is specified.
query = query or "*"

FilterRule = Dict[str, Any]
andFilters: List[FilterRule] = []
andFilters: List[SearchFilterRule] = []

# Platform filter.
if platform:
Expand Down Expand Up @@ -602,14 +605,18 @@ def get_urns_by_filter(
else:
raise ValueError(f"Invalid status filter: {status}")

orFilters: List[Dict[str, List[FilterRule]]] = [{"and": andFilters}]
# Extra filters.
if extraFilters:
andFilters += extraFilters

orFilters: List[Dict[str, List[SearchFilterRule]]] = [{"and": andFilters}]

# Env filter.
if env:
# The env filter is a bit more tricky since it's not always stored
# in the same place in ElasticSearch.

envOrConditions: List[FilterRule] = [
envOrConditions: List[SearchFilterRule] = [
# For most entity types, we look at the origin field.
{
"field": "origin",
Expand Down
35 changes: 35 additions & 0 deletions metadata-ingestion/src/datahub/utilities/yaml_sync_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import contextlib
import pathlib
from typing import Any, Iterator

import ruamel.yaml.util
from ruamel.yaml import YAML


@contextlib.contextmanager
def YamlFileUpdater(file: pathlib.Path) -> Iterator[Any]:
yaml = YAML()
yaml.preserve_quotes = True # type: ignore[assignment]

doc = yaml.load(file)

# All the user to make changes to the doc.
# TODO: Enable replacing the doc entirely.
yield doc

# Guess existing indentation in the file so that we can preserve it.
_, ind, bsi = ruamel.yaml.util.load_yaml_guess_indent(file.read_text())
yaml.width = 2**20 # type: ignore[assignment]

yaml.sequence_indent = ind
yaml.block_seq_indent = bsi

if (ind, bsi) == (4, 2):
# (2, 4, 2) is much more common than (4, 4, 2).
yaml.map_indent = 2 # type: ignore[assignment]
else:
# TODO: Some folks use a different mapping indent than sequence indent.
# We should support that, but for now, we just use the sequence indent.
yaml.map_indent = ind

yaml.dump(doc, file)
91 changes: 91 additions & 0 deletions metadata-ingestion/tests/unit/utilities/test_yaml_sync_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pathlib

from datahub.utilities.yaml_sync_utils import YamlFileUpdater


def test_update_yaml_file(tmp_path: pathlib.Path) -> None:
infile = tmp_path / "test.yml"

# Note - this will drop the leading newline before the comment.
infile.write_text(
"""
# this is a comment
#
obj:
key1: value1
list_ty:
- foo
- key1: value1
key2: value2
"""
)
# ind=4, bsi=2

with YamlFileUpdater(infile) as doc:
doc["foo"] = "bar"
doc["list_ty"].append("baz")
doc["list_ty"][1]["key1.5"] = "val1.5"

assert (
infile.read_text()
== """# this is a comment
#
obj:
key1: value1
list_ty:
- foo
- key1: value1
key2: value2
key1.5: val1.5
- baz
foo: bar
"""
)


def test_indentation_inference(tmp_path: pathlib.Path) -> None:
infile = tmp_path / "test.yml"

infile.write_text(
"""
# this is a comment
#
obj:
key1: value1
list_ty:
- foo
- key1: value1
key2: value2
"""
)
# ind=2, bsi=0

with YamlFileUpdater(infile) as doc:
doc["foo"] = "bar"

assert (
infile.read_text()
== """# this is a comment
#
obj:
key1: value1
list_ty:
- foo
- key1: value1
key2: value2
foo: bar
"""
)


# TODO: This yaml indentation will fail, because the mapping indent is 2 but the sequence indent is 4.
"""
x:
y:
- b: 1
- 2
"""

0 comments on commit 08d4e90

Please sign in to comment.