Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Zenlytic/metrics_layer in…
Browse files Browse the repository at this point in the history
…to feature/polished-group-by-filters
  • Loading branch information
pblankley committed Oct 1, 2024
2 parents 8f309dd + 3805643 commit 6f4141e
Show file tree
Hide file tree
Showing 45 changed files with 3,418 additions and 1,474 deletions.
52 changes: 43 additions & 9 deletions metrics_layer/cli/seeding.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,26 @@ def __init__(
"NUMERIC": "number",
"STRING": "string",
}
self._trino_type_lookup = {
"boolean": "yesno",
"tinyint": "number",
"smallint": "number",
"integer": "number",
"int": "number",
"bigint": "number",
"real": "number",
"double": "number",
"decimal": "number",
"varchar": "string",
"char": "string",
"varbinary": "string",
"json": "string",
"date": "date",
"timestamp": "timestamp",
"timestamp(p)": "timestamp",
"timestamp with time zone": "timestamp",
"timestamp(p) with time zone": "timestamp",
}

def seed(self, auto_tag_searchable_fields: bool = False):
from metrics_layer.core.parse import ProjectDumper, ProjectLoader
Expand Down Expand Up @@ -246,8 +266,13 @@ def seed(self, auto_tag_searchable_fields: bool = False):
dumper.dump_yaml_file(project_data, zenlytic_project_path)

def get_model_name(self, current_models: list):
if len(current_models) > 0:
if len(current_models) == 1:
return current_models[0].name
elif len(current_models) > 1:
for model in current_models:
if self.connection and model.connection == self.connection.name:
return model.name
raise ValueError("Multiple models found, but none match the connection name")
return self.default_model_name

def make_models(self):
Expand Down Expand Up @@ -287,7 +312,7 @@ def make_view(
sql_table_name = f"{schema_name}.{table_name}"
if self._database_is_not_default:
sql_table_name = f"{self.database}.{sql_table_name}"
elif self.connection.type == Definitions.druid:
elif self.connection.type in {Definitions.druid, Definitions.trino}:
sql_table_name = f"{schema_name}.{table_name}"
elif self.connection.type == Definitions.bigquery:
sql_table_name = f"`{self.database}.{schema_name}.{table_name}`"
Expand Down Expand Up @@ -337,14 +362,20 @@ def make_fields(self, column_data, schema_name: str, table_name: str, auto_tag_s
metrics_layer_type = self._sql_server_type_lookup.get(row["DATA_TYPE"], "string")
elif self.connection.type == Definitions.databricks:
metrics_layer_type = self._databricks_type_lookup.get(row["DATA_TYPE"], "string")
elif self.connection.type == Definitions.trino:
metrics_layer_type = self._trino_type_lookup.get(row["DATA_TYPE"], "string")
else:
raise NotImplementedError(f"Unknown connection type: {self.connection.type}")
# Add quotes for certain db only because we've seen issues with column names with special chars
if self.connection.type in {
Definitions.druid,
Definitions.trino,
Definitions.snowflake,
Definitions.duck_db,
Definitions.postgres,
Definitions.redshift,
Definitions.sql_server,
Definitions.azure_synapse,
}:
column_name = '"' + row["COLUMN_NAME"] + '"'
else:
Expand Down Expand Up @@ -419,6 +450,9 @@ def column_cardinalities_query(
if self.connection.type in (Definitions.snowflake, Definitions.duck_db, Definitions.druid):
quote_column_name = f'"{column_name}"' if quote else column_name
query = f'APPROX_COUNT_DISTINCT( {quote_column_name} ) as "{column_name_alias}_cardinality"' # noqa: E501
elif self.connection.type in {Definitions.trino}:
quote_column_name = f'"{column_name}"' if quote else column_name
query = f'APPROX_DISTINCT( {quote_column_name} ) as "{column_name_alias}_cardinality"' # noqa: E501
elif self.connection.type in {Definitions.redshift, Definitions.postgres}:
quote_column_name = f'"{column_name}"' if quote else column_name
query = (
Expand Down Expand Up @@ -455,12 +489,12 @@ def column_cardinalities_query(
Definitions.databricks,
}:
query += f" FROM {self.database}.{schema_name}.{table_name}"
elif self.connection.type == Definitions.druid:
elif self.connection.type in {Definitions.druid, Definitions.trino}:
query += f"FROM {schema_name}.{table_name}"
elif self.connection.type == Definitions.bigquery:
query += f" FROM `{self.database}`.`{schema_name}`.`{table_name}`"

return query + ";" if self.connection.type != Definitions.druid else query
return query + ";" if self.connection.type not in Definitions.no_semicolon_warehouses else query

def columns_query(self):
if self.connection.type in {Definitions.snowflake, Definitions.databricks}:
Expand All @@ -483,7 +517,7 @@ def columns_query(self):
query += f"INFORMATION_SCHEMA.COLUMNS"
else:
query += f"{self.database}.INFORMATION_SCHEMA.COLUMNS"
elif self.connection.type == Definitions.druid:
elif self.connection.type in {Definitions.druid, Definitions.trino}:
query = (
"SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE "
"FROM INFORMATION_SCHEMA.COLUMNS"
Expand All @@ -507,7 +541,7 @@ def columns_query(self):
if self.connection.type == Definitions.snowflake:
# 10k columns is a reasonable max for a single table
return query + " LIMIT 10000;"
return query + ";" if self.connection.type != Definitions.druid else query
return query + ";" if self.connection.type not in Definitions.no_semicolon_warehouses else query

def table_query(self):
if self.database and self.connection.type == Definitions.snowflake:
Expand All @@ -518,12 +552,12 @@ def table_query(self):
"row_count as table_row_count, comment as comment "
f"FROM {self.database}.INFORMATION_SCHEMA.TABLES"
)
elif self.connection.type in {Definitions.druid}:
elif self.connection.type in {Definitions.druid, Definitions.trino}:
query = (
"SELECT TABLE_CATALOG as table_database, TABLE_SCHEMA as table_schema, "
"TABLE_NAME as table_name, TABLE_TYPE as table_type "
"FROM INFORMATION_SCHEMA.TABLES "
"WHERE TABLE_SCHEMA not in ('sys', 'INFORMATION_SCHEMA')"
"WHERE TABLE_SCHEMA not in ('sys', 'INFORMATION_SCHEMA', 'information_schema')"
)
elif self.database and self.connection.type in {
Definitions.redshift,
Expand Down Expand Up @@ -558,7 +592,7 @@ def table_query(self):
)
else:
raise ValueError("You must specify at least a database for seeding")
return query + ";" if self.connection.type != Definitions.druid else query
return query + ";" if self.connection.type not in Definitions.no_semicolon_warehouses else query

def run_query(self, query: str):
if self.run_query_override:
Expand Down
9 changes: 9 additions & 0 deletions metrics_layer/core/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import re
from typing import List

from metrics_layer.core.exceptions import QueryError

NAME_REGEX = re.compile(r"([A-Za-z0-9\_]+)")


Expand Down Expand Up @@ -32,6 +34,13 @@ def name_error(entity_name: str, name: str):
"the naming conventions (only letters, numbers, or underscores)"
)

@staticmethod
def _raise_query_error_from_cte(field_name: str):
raise QueryError(
f"Field {field_name} is not present in either source query, so it"
" cannot be applied as a filter. Please add it to one of the source queries."
)

@staticmethod
def line_col(element):
line = getattr(getattr(element, "lc", None), "line", None)
Expand Down
4 changes: 4 additions & 0 deletions metrics_layer/core/model/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Definitions:
duck_db = "DUCK_DB"
databricks = "DATABRICKS"
azure_synapse = "AZURE_SYNAPSE"
trino = "TRINO"
supported_warehouses = [
snowflake,
bigquery,
Expand All @@ -18,8 +19,11 @@ class Definitions:
duck_db,
databricks,
azure_synapse,
trino,
]
symmetric_aggregates_supported_warehouses = [snowflake, redshift, bigquery, postgres, duck_db]
no_semicolon_warehouses = [druid, trino]
needs_datetime_cast = [bigquery, trino]
supported_warehouses_text = ", ".join(supported_warehouses)

does_not_exist = "__DOES_NOT_EXIST__"
Expand Down
Loading

0 comments on commit 6f4141e

Please sign in to comment.