Skip to content

Commit

Permalink
feat: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
claudiazi committed Apr 26, 2024
1 parent 1c2ad82 commit 7e29420
Showing 1 changed file with 11 additions and 26 deletions.
37 changes: 11 additions & 26 deletions dagger/utilities/dbt_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,19 @@ def _load_file(file_path: str, file_type: str) -> dict:
_logger.error(f"File not found: {file_path}")
exit(1)

@abstractmethod
def _get_athena_table_task(
self, node: dict, follow_external_dependency: bool = False
) -> dict:
"""Generate an athena table task for a DBT node. Must be implemented by subclasses. This function should be deprecated after the source connects with databricks directly"""
pass
"""Generate an athena table task for a DBT node."""
task = ATHENA_TASK_BASE.copy()
if follow_external_dependency:
task["follow_external_dependency"] = True

task["schema"] = node.get("schema", self._default_schema)
task["table"] = node.get("name", "")
task["name"] = f"{task['schema']}__{task['table']}_athena"

return task

@abstractmethod
def _get_table_task(
Expand Down Expand Up @@ -173,7 +180,6 @@ def generate_dagger_io(self, model_name: str) -> Tuple[List[dict], List[dict]]:
inputs_list = []
model_node = self._nodes_in_manifest[f"model.main.{model_name}"]
parent_node_names = model_node.get("depends_on", {}).get("nodes", [])
print(f"parent node name: {parent_node_names}")

for parent_node_name in parent_node_names:
dagger_input = self._generate_dagger_tasks(parent_node_name)
Expand All @@ -187,8 +193,6 @@ def generate_dagger_io(self, model_name: str) -> Tuple[List[dict], List[dict]]:
).values()
)

print(unique_inputs)

return unique_inputs, output_list


Expand All @@ -209,20 +213,7 @@ def _get_table_task(
"""
Generates the dagger athena task for the DBT model node
"""
task = ATHENA_TASK_BASE.copy()
if follow_external_dependency:
task["follow_external_dependency"] = True

task["schema"] = node.get("schema", self._default_schema)
task["table"] = node.get("name", "")
task["name"] = f"{task['schema']}__{task['table']}_athena"

return task

def _get_athena_table_task(
self, node: dict, follow_external_dependency: bool = False
) -> dict:
return self._get_table_task(node, follow_external_dependency)
return self._get_athena_table_task(node, follow_external_dependency)

def _get_model_data_location(
self, node: dict, schema: str, model_name: str
Expand Down Expand Up @@ -286,7 +277,6 @@ def __init__(self, default_config_parameters: dict):
super().__init__(default_config_parameters)
self._profile_name = "databricks"
self._default_catalog = self._target_config.get("catalog")
self._athena_dbt_parser = AthenaDBTConfigParser(default_config_parameters)
self._create_external_athena_table = default_config_parameters.get(
"create_external_athena_table", False
)
Expand All @@ -310,11 +300,6 @@ def _get_table_task(

return task

def _get_athena_table_task(
self, node: dict, follow_external_dependency: bool = False
) -> dict:
return self._athena_dbt_parser._get_table_task(node, follow_external_dependency)

def _get_model_data_location(
self, node: dict, schema: str, model_name: str
) -> Tuple[str, str]:
Expand Down

0 comments on commit 7e29420

Please sign in to comment.