Skip to content

Commit

Permalink
fix(ingest/bigquery): Fix shard regexp to match without underscore as…
Browse files Browse the repository at this point in the history
… well (#8934)
  • Loading branch information
treff7es authored Oct 12, 2023
1 parent 84bba4d commit dd418de
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
):
field.description = col.comment
schema_fields[idx] = field
break
else:
tags = []
if col.is_partition_column:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@

logger: logging.Logger = logging.getLogger(__name__)

_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = "((.+)[_$])?(\\d{8})$"
# Regexp for sharded tables.
# A sharded table is a table that has a suffix of the form _yyyymmdd or yyyymmdd, where yyyymmdd is a date.
# The regexp checks for valid dates in the suffix (e.g. 20200101, 20200229, 20201231) and if the date is not valid
# then it is not a sharded table.
_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = (
"((.+\\D)[_$]?)?(\\d\\d\\d\\d(?:0[1-9]|1[0-2])(?:0[1-9]|[12][0-9]|3[01]))$"
)


@dataclass(frozen=True, order=True)
Expand All @@ -40,7 +46,7 @@ class BigqueryTableIdentifier:
_BQ_SHARDED_TABLE_SUFFIX: str = "_yyyymmdd"

@staticmethod
def get_table_and_shard(table_name: str) -> Tuple[str, Optional[str]]:
def get_table_and_shard(table_name: str) -> Tuple[Optional[str], Optional[str]]:
"""
Args:
table_name:
Expand All @@ -53,16 +59,25 @@ def get_table_and_shard(table_name: str) -> Tuple[str, Optional[str]]:
In case of non-sharded tables, returns (<table-id>, None)
In case of sharded tables, returns (<table-prefix>, shard)
"""
new_table_name = table_name
match = re.match(
BigqueryTableIdentifier._BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX,
table_name,
re.IGNORECASE,
)
if match:
table_name = match.group(2)
shard = match.group(3)
return table_name, shard
return table_name, None
shard: str = match[3]
if shard:
if table_name.endswith(shard):
new_table_name = table_name[: -len(shard)]

new_table_name = (
new_table_name.rstrip("_") if new_table_name else new_table_name
)
if new_table_name.endswith("."):
new_table_name = table_name
return (new_table_name, shard) if new_table_name else (None, shard)
return new_table_name, None

@classmethod
def from_string_name(cls, table: str) -> "BigqueryTableIdentifier":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class BigqueryQuery:
p.max_partition_id,
p.active_billable_bytes,
p.long_term_billable_bytes,
REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base
REGEXP_EXTRACT(t.table_name, r"(?:(?:.+\\D)[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"(?:[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$", "") as table_base
FROM
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
Expand Down Expand Up @@ -92,8 +92,8 @@ class BigqueryQuery:
tos.OPTION_VALUE as comment,
t.is_insertable_into,
t.ddl,
REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base
REGEXP_EXTRACT(t.table_name, r"(?:(?:.+\\D)[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"(?:[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$", "") as table_base
FROM
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

from datahub.configuration.common import ConfigModel, ConfigurationError

_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: str = "((.+)[_$])?(\\d{8})$"
# Regexp for sharded tables.
# A sharded table is a table that has a suffix of the form _yyyymmdd or yyyymmdd, where yyyymmdd is a date.
# The regexp checks for valid dates in the suffix (e.g. 20200101, 20200229, 20201231) and if the date is not valid
# then it is not a sharded table.
_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: str = (
"((.+\\D)[_$]?)?(\\d\\d\\d\\d(?:0[1-9]|1[0-2])(?:0[1-9]|[12][0-9]|3[01]))$"
)


class BigQueryBaseConfig(ConfigModel):
Expand Down
10 changes: 6 additions & 4 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,11 +765,14 @@ def test_gen_view_dataset_workunits(
("project.dataset.table_20231215", "project.dataset.table", "20231215"),
("project.dataset.table_2023", "project.dataset.table_2023", None),
# incorrectly handled special case where dataset itself is a sharded table if full name is specified
("project.dataset.20231215", "project.dataset.20231215", None),
("project.dataset.20231215", "project.dataset.20231215", "20231215"),
("project1.dataset2.20231215", "project1.dataset2.20231215", "20231215"),
# Cases with Just the table name as input
("table", "table", None),
("table20231215", "table20231215", None),
("table20231215", "table", "20231215"),
("table_20231215", "table", "20231215"),
("table2_20231215", "table2", "20231215"),
("table220231215", "table220231215", None),
("table_1624046611000_name", "table_1624046611000_name", None),
("table_1624046611000", "table_1624046611000", None),
# Special case where dataset itself is a sharded table
Expand Down Expand Up @@ -801,7 +804,6 @@ def test_get_table_and_shard_default(
("project.dataset.2023", "project.dataset.2023", None),
# Cases with Just the table name as input
("table", "table", None),
("table20231215", "table20231215", None),
("table_20231215", "table", "20231215"),
("table_2023", "table", "2023"),
("table_1624046611000_name", "table_1624046611000_name", None),
Expand Down Expand Up @@ -842,7 +844,7 @@ def test_get_table_and_shard_custom_shard_pattern(
"project.dataset.table_1624046611000_name",
),
("project.dataset.table_1624046611000", "project.dataset.table_1624046611000"),
("project.dataset.table20231215", "project.dataset.table20231215"),
("project.dataset.table20231215", "project.dataset.table"),
("project.dataset.table_*", "project.dataset.table"),
("project.dataset.table_2023*", "project.dataset.table"),
("project.dataset.table_202301*", "project.dataset.table"),
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ def test_bigquery_table_sanitasitation():
assert new_table_ref.dataset == "dataset-4567"

table_ref = BigQueryTableRef(
BigqueryTableIdentifier("project-1234", "dataset-4567", "foo_20222110")
BigqueryTableIdentifier("project-1234", "dataset-4567", "foo_20221210")
)
new_table_identifier = table_ref.table_identifier
assert new_table_identifier.table == "foo_20222110"
assert new_table_identifier.table == "foo_20221210"
assert new_table_identifier.is_sharded_table()
assert new_table_identifier.get_table_display_name() == "foo"
assert new_table_identifier.project_id == "project-1234"
Expand Down

0 comments on commit dd418de

Please sign in to comment.