From c64ec8af5a6a6cf3ff6c5b3ec0552d220139784d Mon Sep 17 00:00:00 2001 From: Mike Gouline <1960272+gouline@users.noreply.github.com> Date: Sun, 28 Aug 2022 22:31:59 +1000 Subject: [PATCH] fix: Consistent model selection in parsing (#141) --- dbtmetabase/__init__.py | 3 +- dbtmetabase/metabase.py | 33 ++++---- dbtmetabase/models/metabase.py | 6 +- dbtmetabase/parsers/dbt.py | 26 +++++-- dbtmetabase/parsers/dbt_folder.py | 116 +++++++++++++++------------- dbtmetabase/parsers/dbt_manifest.py | 110 ++++++++++++-------------- 6 files changed, 155 insertions(+), 139 deletions(-) diff --git a/dbtmetabase/__init__.py b/dbtmetabase/__init__.py index 18bafdce..5b561955 100644 --- a/dbtmetabase/__init__.py +++ b/dbtmetabase/__init__.py @@ -724,8 +724,7 @@ def exposures( ) # Load models - dbt_models, aliases = dbt.read_models() - del aliases # Unused in this particular function + dbt_models, _ = dbt.read_models() # Instantiate Metabase interface metabase = MetabaseInterface( diff --git a/dbtmetabase/metabase.py b/dbtmetabase/metabase.py index ae9c5a5c..515c6b9d 100644 --- a/dbtmetabase/metabase.py +++ b/dbtmetabase/metabase.py @@ -1,25 +1,30 @@ -import re import json -import requests -import time -import yaml import os - +import re +import time from typing import ( - Sequence, - Optional, - Tuple, Iterable, - MutableMapping, - Union, List, Mapping, + MutableMapping, + Optional, + Sequence, + Tuple, + Union, ) -from dbtmetabase.models import exceptions +import requests +import yaml from .logger.logging import logger -from .models.metabase import MetabaseModel, MetabaseColumn, ModelType, NullValue +from .models import exceptions +from .models.metabase import ( + METABASE_MODEL_DEFAULT_SCHEMA, + MetabaseColumn, + MetabaseModel, + ModelType, + NullValue, +) class MetabaseClient: @@ -458,7 +463,9 @@ def build_metadata_lookups( table_schema = table.get("schema") # table["schema"] is null for bigquery datasets bigquery_schema = metadata.get("details", {}).get("dataset-id") - table_schema = (table_schema or bigquery_schema or "public").upper() + table_schema = ( + table_schema or bigquery_schema or METABASE_MODEL_DEFAULT_SCHEMA + ).upper() table_name = table["name"].upper() if schemas_to_exclude: diff --git a/dbtmetabase/models/metabase.py b/dbtmetabase/models/metabase.py index 06c08ed4..3ce2a412 100644 --- a/dbtmetabase/models/metabase.py +++ b/dbtmetabase/models/metabase.py @@ -1,7 +1,6 @@ from dataclasses import dataclass, field from enum import Enum - -from typing import Sequence, Optional, MutableMapping +from typing import MutableMapping, Optional, Sequence # Allowed metabase.* fields # Must be covered by MetabaseModel attributes @@ -16,6 +15,9 @@ "semantic_type", ] +# Default model schema (only schema in BigQuery) +METABASE_MODEL_DEFAULT_SCHEMA = "PUBLIC" + class ModelType(str, Enum): nodes = "nodes" diff --git a/dbtmetabase/parsers/dbt.py b/dbtmetabase/parsers/dbt.py index 2530e55d..99509880 100644 --- a/dbtmetabase/parsers/dbt.py +++ b/dbtmetabase/parsers/dbt.py @@ -1,9 +1,9 @@ from abc import ABCMeta, abstractmethod from os.path import expanduser -from typing import Optional, Mapping, MutableMapping, Iterable, Tuple, List +from typing import Iterable, List, Mapping, MutableMapping, Optional, Tuple from ..logger.logging import logger -from ..models.metabase import MetabaseModel, MetabaseColumn, NullValue +from ..models.metabase import MetabaseColumn, MetabaseModel, NullValue class DbtReader(metaclass=ABCMeta): @@ -31,11 +31,11 @@ def __init__( """ self.path = expanduser(path) - self.database = database - self.schema = schema - self.schema_excludes = schema_excludes - self.includes = includes - self.excludes = excludes + self.database = database.upper() if database else None + self.schema = schema.upper() if schema else None + self.schema_excludes = [x.upper() for x in schema_excludes or []] + self.includes = [x.upper() for x in includes or []] + self.excludes = [x.upper() for x in excludes or []] self.alias_mapping: MutableMapping = {} @abstractmethod @@ -46,6 +46,18 @@ def read_models( ) -> Tuple[List[MetabaseModel], MutableMapping]: pass + def model_selected(self, name: str) -> bool: + """Checks whether model passes inclusion/exclusion criteria. + + Args: + name (str): Model name. + + Returns: + bool: True if included, false otherwise. + """ + n = name.upper() + return n not in self.excludes and (not self.includes or n in self.includes) + def set_column_foreign_key( self, column: Mapping, diff --git a/dbtmetabase/parsers/dbt_folder.py b/dbtmetabase/parsers/dbt_folder.py index 310acc22..0d6a798c 100644 --- a/dbtmetabase/parsers/dbt_folder.py +++ b/dbtmetabase/parsers/dbt_folder.py @@ -1,15 +1,17 @@ import re -import yaml from pathlib import Path from typing import List, Mapping, MutableMapping, Optional, Tuple +import yaml + from ..logger.logging import logger from ..models.metabase import ( - MetabaseModel, + METABASE_COLUMN_META_FIELDS, + METABASE_MODEL_DEFAULT_SCHEMA, + METABASE_MODEL_META_FIELDS, MetabaseColumn, + MetabaseModel, ModelType, - METABASE_MODEL_META_FIELDS, - METABASE_COLUMN_META_FIELDS, ) from .dbt import DbtReader @@ -34,78 +36,86 @@ def read_models( list -- List of dbt models in Metabase-friendly format. """ - database = self.database - schema = self.schema - schema_excludes = self.schema_excludes - includes = self.includes - excludes = self.excludes - - if schema_excludes is None: - schema_excludes = [] - if includes is None: - includes = [] - if excludes is None: - excludes = [] - if schema is None: - schema = "public" - - # Args that allow API interface for both readers to be interchangeable while passing CI - del database, docs_url - mb_models: List[MetabaseModel] = [] + schema = self.schema or METABASE_MODEL_DEFAULT_SCHEMA + for path in (Path(self.path) / "models").rglob("*.yml"): with open(path, "r", encoding="utf-8") as stream: schema_file = yaml.safe_load(stream) - if schema_file is None: + if not schema_file: logger().warning("Skipping empty or invalid YAML: %s", path) continue + for model in schema_file.get("models", []): - name = model.get("alias", model["name"]) + model_name = model.get("alias", model["name"]).upper() + # Refs will still use file name -- this alias mapping is good for getting the right name in the database if "alias" in model: - self.alias_mapping[name] = model["name"] + self.alias_mapping[model_name] = model["name"].upper() + logger().info("\nProcessing model: %s", path) - if (not includes or name in includes) and (name not in excludes): - mb_models.append( - self._read_model( - model=model, - schema=schema.upper(), - model_type=ModelType.nodes, - include_tags=include_tags, - ) + + if not self.model_selected(model_name): + logger().debug( + "Skipping %s not included in includes or excluded by excludes", + model_name, ) - logger().debug(mb_models[-1].ref) + continue + + mb_models.append( + self._read_model( + model=model, + schema=schema, + model_type=ModelType.nodes, + include_tags=include_tags, + ) + ) + for source in schema_file.get("sources", []): - source_schema_name = source.get("schema", source["name"]) + source_schema_name = source.get("schema", source["name"]).upper() + if "{{" in source_schema_name and "}}" in source_schema_name: logger().warning( - "dbt Folder Reader cannot resolve jinja expressions- use the Manifest Reader instead." + "dbt folder reader cannot resolve Jinja expressions, defaulting to current schema" ) source_schema_name = schema - if source_schema_name.upper() != schema.upper(): + + elif source_schema_name != schema: + logger().debug( + "Skipping schema %s not in target schema %s", + source_schema_name, + schema, + ) continue + for model in source.get("tables", []): - name = model.get("identifier", model["name"]) + model_name = model.get("identifier", model["name"]).upper() + # These will be used to resolve our regex parsed source() references if "identifier" in model: - self.alias_mapping[name] = model["name"] + self.alias_mapping[model_name] = model["name"].upper() + logger().info( - "\nProcessing source: %s -- table: %s", path, name + "\nProcessing source: %s -- table: %s", path, model_name ) - if (not includes or name in includes) and ( - name not in excludes - ): - mb_models.append( - self._read_model( - model=model, - source=source["name"], - model_type=ModelType.sources, - schema=source_schema_name.upper(), - include_tags=include_tags, - ) + + if not self.model_selected(model_name): + logger().debug( + "Skipping %s not included in includes or excluded by excludes", + model_name, + ) + continue + + mb_models.append( + self._read_model( + model=model, + source=source["name"], + model_type=ModelType.sources, + schema=source_schema_name, + include_tags=include_tags, ) - logger().debug(mb_models[-1].ref) + ) return mb_models, self.alias_mapping @@ -168,7 +178,7 @@ def _read_column(self, column: Mapping, schema: str) -> MetabaseColumn: Arguments: column {dict} -- One dbt column to read. - schema {str} -- Schema as passed doen from CLI args or parsed from `source` + schema {str} -- Schema as passed down from CLI args or parsed from `source` Returns: dict -- One dbt column in Metabase-friendly format. diff --git a/dbtmetabase/parsers/dbt_manifest.py b/dbtmetabase/parsers/dbt_manifest.py index f114dc23..0a1ca00b 100644 --- a/dbtmetabase/parsers/dbt_manifest.py +++ b/dbtmetabase/parsers/dbt_manifest.py @@ -1,13 +1,14 @@ import json -from typing import List, Tuple, Mapping, Optional, MutableMapping +from typing import List, Mapping, MutableMapping, Optional, Tuple from ..logger.logging import logger from ..models.metabase import ( - MetabaseModel, + METABASE_COLUMN_META_FIELDS, + METABASE_MODEL_DEFAULT_SCHEMA, + METABASE_MODEL_META_FIELDS, MetabaseColumn, + MetabaseModel, ModelType, - METABASE_MODEL_META_FIELDS, - METABASE_COLUMN_META_FIELDS, ) from .dbt import DbtReader @@ -30,17 +31,8 @@ def read_models( list -- List of dbt models in Metabase-friendly format. """ - database = self.database - schema = self.schema - schema_excludes = self.schema_excludes - includes = [x.lower() for x in self.includes] if self.includes else [] - excludes = [x.lower() for x in self.excludes] if self.excludes else [] - manifest = {} - if schema_excludes is None: - schema_excludes = [] - mb_models: List[MetabaseModel] = [] with open(self.path, "r", encoding="utf-8") as manifest_file: @@ -48,6 +40,12 @@ def read_models( for _, node in manifest["nodes"].items(): model_name = node["name"].upper() + model_schema = node["schema"].upper() + model_database = node["database"].upper() + + if node["resource_type"] != "model": + logger().debug("Skipping %s not of resource type model", model_name) + continue if node["config"]["materialized"] == "ephemeral": logger().debug( @@ -55,44 +53,33 @@ def read_models( ) continue - if node["database"].upper() != database.upper(): - # Skip model not associated with target database + if model_database != self.database: logger().debug( "Skipping %s in database %s, not in target database %s", model_name, - node["database"], - database, + model_database, + self.database, ) continue - if node["resource_type"] != "model": - # Target only model nodes - logger().debug("Skipping %s not of resource type model", model_name) - continue - - if schema and node["schema"].upper() != schema.upper(): - # Skip any models not in target schema + if self.schema and model_schema != self.schema: logger().debug( "Skipping %s in schema %s not in target schema %s", model_name, - node["schema"], - schema, + model_schema, + self.schema, ) continue - if schema_excludes and node["schema"].upper() in schema_excludes: - # Skip any model in a schema marked for exclusion + if model_schema in self.schema_excludes: logger().debug( "Skipping %s in schema %s marked for exclusion", model_name, - node["schema"], + model_schema, ) continue - if (includes and model_name.lower() not in includes) or ( - model_name.lower() in excludes - ): - # Process only intersect of includes and excludes + if not self.model_selected(model_name): logger().debug( "Skipping %s not included in includes or excluded by excludes", model_name, @@ -111,46 +98,41 @@ def read_models( ) for _, node in manifest["sources"].items(): - model_name = node.get("identifier", node.get("name")).upper() + source_name = node.get("identifier", node.get("name")).upper() + source_schema = node["schema"].upper() + source_database = node["database"].upper() - if node["database"].upper() != database.upper(): - # Skip model not associated with target database - logger().debug( - "Skipping %s not in target database %s", model_name, database - ) + if node["resource_type"] != "source": + logger().debug("Skipping %s not of resource type source", source_name) continue - if node["resource_type"] != "source": - # Target only source nodes - logger().debug("Skipping %s not of resource type source", model_name) + if source_database != self.database: + logger().debug( + "Skipping %s not in target database %s", source_name, self.database + ) continue - if schema and node["schema"].upper() != schema.upper(): - # Skip any models not in target schema + if self.schema and source_schema != self.schema: logger().debug( "Skipping %s in schema %s not in target schema %s", - model_name, - node["schema"], - schema, + source_name, + source_schema, + self.schema, ) continue - if schema_excludes and node["schema"].upper() in schema_excludes: - # Skip any model in a schema marked for exclusion + if source_schema in self.schema_excludes: logger().debug( "Skipping %s in schema %s marked for exclusion", - model_name, - node["schema"], + source_name, + source_schema, ) continue - if (includes and model_name.lower() not in includes) or ( - model_name.lower() in excludes - ): - # Process only intersect of includes and excludes + if not self.model_selected(source_name): logger().debug( "Skipping %s not included in includes or excluded by excludes", - model_name, + source_name, ) continue @@ -188,8 +170,9 @@ def _read_model( dict -- One dbt model in Metabase-friendly format. """ - metabase_column: List[MetabaseColumn] = [] + metabase_columns: List[MetabaseColumn] = [] + schema = model["schema"].upper() unique_id = model["unique_id"] children = manifest["child_map"][unique_id] @@ -248,7 +231,7 @@ def _read_model( continue fk_target_schema = manifest[model_type][depends_on_id].get( - "schema", "public" + "schema", METABASE_MODEL_DEFAULT_SCHEMA ) fk_target_field = child["test_metadata"]["kwargs"]["field"].strip('"') @@ -258,9 +241,10 @@ def _read_model( } for _, column in model.get("columns", {}).items(): - metabase_column.append( + metabase_columns.append( self._read_column( column=column, + schema=schema, relationship=relationship_tests.get(column["name"]), ) ) @@ -290,9 +274,9 @@ def _read_model( return MetabaseModel( name=resolved_name, - schema=model["schema"].upper(), + schema=schema, description=description, - columns=metabase_column, + columns=metabase_columns, model_type=model_type, unique_id=unique_id, source=source, @@ -303,12 +287,14 @@ def _read_model( def _read_column( self, column: Mapping, + schema: str, relationship: Optional[Mapping], ) -> MetabaseColumn: """Reads one dbt column in Metabase-friendly format. Arguments: column {dict} -- One dbt column to read. + schema {str} -- Schema as passed down from CLI args or parsed from `source` relationship {Mapping, optional} -- Mapping of columns to their foreign key relationships Returns: @@ -328,7 +314,7 @@ def _read_column( metabase_column=metabase_column, table=relationship["fk_target_table"] if relationship else None, field=relationship["fk_target_field"] if relationship else None, - schema=self.schema, + schema=schema, ) return metabase_column