diff --git a/dbtmetabase/__init__.py b/dbtmetabase/__init__.py index cf790108..4b915365 100644 --- a/dbtmetabase/__init__.py +++ b/dbtmetabase/__init__.py @@ -8,7 +8,7 @@ def export(dbt_path: str, mb_host: str, mb_user: str, mb_password: str, database: str, schema: str, - sync = True, sync_timeout_secs = 30): + sync = True, sync_timeout = 30): """Exports models from dbt to Metabase. Arguments: @@ -21,14 +21,14 @@ def export(dbt_path: str, Keyword Arguments: sync {bool} -- Synchronize Metabase database before export. (default: {True}) - sync_timeout_secs {int} -- Synchronization timeout in seconds. (default: {30}) + sync_timeout {int} -- Synchronization timeout in seconds. (default: {30}) """ mbc = MetabaseClient(mb_host, mb_user, mb_password) models = DbtReader(dbt_path).read_models() if sync: - if not mbc.sync_and_wait(database, schema, models, sync_timeout_secs): + if not mbc.sync_and_wait(database, schema, models, sync_timeout): logging.critical("Sync timeout reached, models still not compatible") return @@ -39,4 +39,28 @@ def main(args: list = None): logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) - # TODO: argparse here + parser = argparse.ArgumentParser( + description='Model synchronization from dbt to Metabase.' + ) + parser.add_argument('command', choices=['export'], help="command to execute") + parser.add_argument('--dbt_path', metavar='PATH', required=True, help="path to dbt project") + parser.add_argument('--mb_host', metavar='HOST', required=True, help="Metabase hostname") + parser.add_argument('--mb_user', metavar='USER', required=True, help="Metabase username") + parser.add_argument('--mb_password', metavar='PASS', required=True, help="Metabase password") + parser.add_argument('--database', metavar='DB', required=True, help="target database name") + parser.add_argument('--schema', metavar='SCHEMA', required=True, help="target schema name") + parser.add_argument('--sync', metavar='ENABLE', type=bool, default=True, help="synchronize Metabase database before export") + parser.add_argument('--sync_timeout', metavar='SECS', type=int, default=30, help="synchronization timeout (in secs)") + parsed = parser.parse_args(args=args) + + if parsed.command == 'export': + export( + dbt_path=parsed.dbt_path, + mb_host=parsed.mb_host, + mb_user=parsed.mb_user, + mb_password=parsed.mb_password, + database=parsed.database, + schema=parsed.schema, + sync=parsed.sync, + sync_timeout=parsed.sync_timeout + ) diff --git a/dbtmetabase/scripts/dbtmetabase b/dbtmetabase/bin/dbt-metabase similarity index 100% rename from dbtmetabase/scripts/dbtmetabase rename to dbtmetabase/bin/dbt-metabase diff --git a/dbtmetabase/dbt.py b/dbtmetabase/dbt.py index 14f0f30c..c24ad428 100644 --- a/dbtmetabase/dbt.py +++ b/dbtmetabase/dbt.py @@ -2,11 +2,25 @@ import re class DbtReader: + """Reader for dbt project configuration. + """ def __init__(self, project_path: str): + """Constructor. + + Arguments: + project_path {str} -- Path to dbt project root. + """ + self.project_path = project_path def read_models(self) -> list: + """Reads dbt models in Metabase-friendly format. + + Returns: + list -- List of dbt models in Metabase-friendly format. + """ + mb_models = [] for path in (self.project_path / 'models').rglob('*.yml'): @@ -18,6 +32,15 @@ def read_models(self) -> list: return mb_models def read_model(self, model: dict) -> dict: + """Reads one dbt model in Metabase-friendly format. + + Arguments: + model {dict} -- One dbt model to read. + + Returns: + dict -- One dbt model in Metabase-friendly format. + """ + mb_columns = [] for column in model.get('columns', []): @@ -30,6 +53,15 @@ def read_model(self, model: dict) -> dict: } def read_column(self, column: dict) -> dict: + """Reads one dbt column in Metabase-friendly format. + + Arguments: + column {dict} -- One dbt column to read. + + Returns: + dict -- One dbt column in Metabase-friendly format. + """ + mb_column = { 'name': column.get('name', '').upper(), 'description': column.get('description') @@ -51,6 +83,15 @@ def read_column(self, column: dict) -> dict: @staticmethod def parse_ref(text: str) -> str: + """Parses dbt ref() statement. + + Arguments: + text {str} -- Full statement in dbt YAML. + + Returns: + str -- Name of the reference. + """ + matches = re.findall(r"ref\(['\"]([\w\_\-\ ]+)['\"]\)", text) if matches: return matches[0] diff --git a/dbtmetabase/metabase.py b/dbtmetabase/metabase.py index f5cc8599..bf98e89e 100644 --- a/dbtmetabase/metabase.py +++ b/dbtmetabase/metabase.py @@ -6,23 +6,57 @@ from typing import Any class MetabaseClient: + """Metabase API client. + """ _SYNC_PERIOD_SECS = 5 def __init__(self, host: str, user: str, password: str): + """Constructor. + + Arguments: + host {str} -- Metabase hostname. + user {str} -- Metabase username. + password {str} -- Metabase password. + """ + self.host = host self.session_id = self.get_session_id(user, password) logging.info("Session established successfully") def get_session_id(self, user: str, password: str) -> str: + """Obtains new session ID from API. + + Arguments: + user {str} -- Metabase username. + password {str} -- Metabase password. + + Returns: + str -- Session ID. + """ + return self.api('post', '/api/session', authenticated=False, json={ 'username': user, 'password': password })['id'] - def sync_and_wait(self, database: str, schema: str, models: list, timeout_secs = 30) -> bool: - if timeout_secs < self._SYNC_PERIOD_SECS: - logging.critical("Timeout provided %d secs, must be at least %d", timeout_secs, self._SYNC_PERIOD_SECS) + def sync_and_wait(self, database: str, schema: str, models: list, timeout = 30) -> bool: + """Synchronize with the database and wait for schema compatibility. + + Arguments: + database {str} -- Metabase database name. + schema {str} -- Metabase schema name. + models {list} -- List of dbt models read from project. + + Keyword Arguments: + timeout {int} -- Timeout before giving up in seconds. (default: {30}) + + Returns: + bool -- True if schema compatible with models, false if still incompatible. + """ + + if timeout < self._SYNC_PERIOD_SECS: + logging.critical("Timeout provided %d secs, must be at least %d", timeout, self._SYNC_PERIOD_SECS) return database_id = self.find_database_id(database) @@ -32,16 +66,29 @@ def sync_and_wait(self, database: str, schema: str, models: list, timeout_secs = self.api('post', f'/api/database/{database_id}/sync') + deadline = int(time.time()) + timeout sync_successful = False while True: sync_successful = self.models_compatible(database_id, schema, models) - if not sync_successful: # TODO and timeout budget not reached + time_after_wait = int(time.time()) + self._SYNC_PERIOD_SECS + if not sync_successful and time_after_wait <= deadline: time.sleep(self._SYNC_PERIOD_SECS) else: break return sync_successful def models_compatible(self, database_id: str, schema: str, models: list) -> bool: + """Checks if models compatible with the Metabase database schema. + + Arguments: + database_id {str} -- Metabase database ID. + schema {str} -- Metabase schema name. + models {list} -- List of dbt models read from project. + + Returns: + bool -- True if schema compatible with models, false otherwise. + """ + field_lookup = self.build_field_lookup(database_id, schema) for model in models: @@ -58,6 +105,14 @@ def models_compatible(self, database_id: str, schema: str, models: list) -> bool return True def export_models(self, database: str, schema: str, models: list): + """Exports dbt models to Metabase database schema. + + Arguments: + database {str} -- Metabase database name. + schema {str} -- Metabase schema name. + models {list} -- List of dbt models read from project. + """ + database_id = self.find_database_id(database) if not database_id: logging.critical("Cannot find database by name %s", database) @@ -70,6 +125,14 @@ def export_models(self, database: str, schema: str, models: list): self.export_model(model, table_lookup, field_lookup) def export_model(self, model: dict, table_lookup: dict, field_lookup: dict): + """Exports one dbt model to Metabase database schema. + + Arguments: + model {dict} -- One dbt model read from project. + table_lookup {dict} -- Dictionary of Metabase tables indexed by name. + field_lookup {dict} -- Dictionary of Metabase fields indexed by name, indexed by table name. + """ + model_name = model['name'].upper() api_table = table_lookup.get(model_name) @@ -89,7 +152,15 @@ def export_model(self, model: dict, table_lookup: dict, field_lookup: dict): for column in model.get('columns', []): self.export_column(model_name, column, field_lookup) - def export_column(self, model_name: str, column: dict, field_lookup: dict): + def export_column(self, model_name: str, column: dict, field_lookup: dict): + """Exports one dbt column to Metabase database schema. + + Arguments: + model_name {str} -- One dbt model name read from project. + column {dict} -- One dbt column read from project. + field_lookup {dict} -- Dictionary of Metabase fields indexed by name, indexed by table name. + """ + column_name = column['name'].upper() field = field_lookup.get(model_name, {}).get(column_name) @@ -119,12 +190,31 @@ def export_column(self, model_name: str, column: dict, field_lookup: dict): logging.info("Field %s.%s is up-to-date", model_name, column_name) def find_database_id(self, name: str) -> str: + """Finds Metabase database ID by name. + + Arguments: + name {str} -- Metabase database name. + + Returns: + str -- Metabase database ID. + """ + for database in self.api('get', '/api/database'): if database['name'].upper() == name.upper(): return database['id'] return None def build_table_lookup(self, database_id: str, schema: str) -> dict: + """Builds table lookup. + + Arguments: + database_id {str} -- Metabase database ID. + schema {str} -- Metabase schema name. + + Returns: + dict -- Dictionary of tables indexed by name. + """ + lookup = {} for table in self.api('get', f'/api/table'): @@ -137,6 +227,16 @@ def build_table_lookup(self, database_id: str, schema: str) -> dict: return lookup def build_field_lookup(self, database_id: str, schema: str) -> dict: + """Builds field lookup. + + Arguments: + database_id {str} -- Metabase database ID. + schema {str} -- Metabase schema name. + + Returns: + dict -- Dictionary of fields indexed by name, indexed by table name. + """ + lookup = {} for field in self.api('get', f'/api/database/{database_id}/fields'): @@ -156,6 +256,20 @@ def build_field_lookup(self, database_id: str, schema: str) -> dict: return lookup def api(self, method: str, path: str, authenticated = True, critical = True, **kwargs) -> Any: + """Unified way of calling Metabase API. + + Arguments: + method {str} -- HTTP verb, e.g. get, post, put. + path {str} -- Relative path of endpoint, e.g. /api/database. + + Keyword Arguments: + authenticated {bool} -- Includes session ID when true. (default: {True}) + critical {bool} -- Raise on any HTTP errors. (default: {True}) + + Returns: + Any -- JSON payload of the endpoint. + """ + headers = {} if 'headers' not in kwargs: kwargs['headers'] = headers @@ -171,3 +285,4 @@ def api(self, method: str, path: str, authenticated = True, critical = True, **k elif not response.ok: return False return json.loads(response.text) + diff --git a/setup.py b/setup.py index c106c143..819af53f 100644 --- a/setup.py +++ b/setup.py @@ -29,13 +29,8 @@ license='MIT License', packages=find_packages(exclude=['tests']), test_suite='tests', - entry_points={ - 'console_scripts': [ - 'dbtmetabase = dbtmetabase:main' - ] - }, scripts=[ - 'dbtmetabase/scripts/dbtmetabase' + 'dbtmetabase/bin/dbt-metabase' ], tests_require=test_requires, install_requires=requires,