diff --git a/src/parsers.py b/src/parsers.py index 9f6fb26..7a7cdda 100644 --- a/src/parsers.py +++ b/src/parsers.py @@ -92,61 +92,65 @@ def _parse_yaml(self) -> DAGConfig: """Processes the config file in order to instantiate the DAG in Airflow. """ - with open(self.filepath, 'r') as file: + with open(self.filepath, "r") as file: dag_config_dict = yaml.safe_load(file) # Mandatory fields - dag = self._try_get(dag_config_dict, 'dag') - dag_id = self._try_get(dag, 'id') - description = self._try_get(dag, 'description') - report = self._try_get(dag, 'report') - search = self._try_get(dag, 'search') - terms, sql, conn_id = self._get_terms_params(search) + dag = self._try_get(dag_config_dict, "dag") + dag_id = self._try_get(dag, "id") + description = self._try_get(dag, "description") + report = self._try_get(dag, "report") + search = self._try_get(dag, "search") + + search_dict = {} + if isinstance(search, dict): + for key, subsearch in search.items(): + search_dict[key] = {} + search_dict[key]["sources"] = search.get("sources", ["DOU"]) + ( + search_dict[key]["terms"], + search_dict[key]["sql"], + search_dict[key]["conn"], + ) = self._get_terms_params(subsearch) + search_dict[key]["territory_id"] = subsearch.get("territory_id", None) + search_dict[key]["dou_sections"] = subsearch.get( + "dou_sections", ["TODOS"] + ) + search_dict[key]["search_date"] = subsearch.get("date", "DIA") + search_dict[key]["field"] = subsearch.get("field", "TUDO") + search_dict[key]["is_exact_search"] = subsearch.get( + "is_exact_search", True + ) + search_dict[key]["ignore_signature_match"] = subsearch.get( + "ignore_signature_match", False + ) + search_dict[key]["force_rematch"] = subsearch.get("force_rematch", None) + search_dict[key]["full_text"] = subsearch.get("full_text", None) + search_dict[key]["department"] = subsearch.get("department", None) # Optional fields - owner = ", ".join(dag.get('owner', [])) - sources = search.get('sources', ['DOU']) - discord_webhook = (report['discord']['webhook'] - if report.get('discord') else None) - slack_webhook = (report['slack']['webhook'] - if report.get('slack') else None) - territory_id = search.get('territory_id', None) - dou_sections = search.get('dou_sections', ['TODOS']) - search_date = search.get('date', 'DIA') - field = search.get('field', 'TUDO') - is_exact_search = search.get('is_exact_search', True) - ignore_signature_match = search.get('ignore_signature_match', False) - force_rematch = search.get('force_rematch', None) - full_text = search.get('full_text', None) - department = search.get('department', None) + owner = ", ".join(dag.get("owner", [])) + discord_webhook = ( + report["discord"]["webhook"] if report.get("discord") else None + ) + slack_webhook = report["slack"]["webhook"] if report.get("slack") else None + schedule = self._get_safe_schedule(dag, self.DEFAULT_SCHEDULE) - doc_md = dag.get('doc_md', None) + doc_md = dag.get("doc_md", None) if doc_md: doc_md = textwrap.dedent(doc_md) - dag_tags = dag.get('tags', []) + dag_tags = dag.get("tags", []) # add default tags - dag_tags.append('dou') - dag_tags.append('generated_dag') - skip_null = report.get('skip_null', True) - emails = report.get('emails') - subject = report.get('subject', 'Extraçao do DOU') - attach_csv = report.get('attach_csv', False) + dag_tags.append("dou") + dag_tags.append("generated_dag") + skip_null = report.get("skip_null", True) + emails = report.get("emails") + subject = report.get("subject", "Extraçao do DOU") + attach_csv = report.get("attach_csv", False) return DAGConfig( dag_id=dag_id, - sources=sources, - territory_id=territory_id, - dou_sections=dou_sections, - search_date=search_date, - field=field, - is_exact_search=is_exact_search, - ignore_signature_match=ignore_signature_match, - force_rematch=force_rematch, - full_text=full_text, - terms=terms, - sql=sql, - conn_id=conn_id, - department=department, + search_dict=search_dict, emails=emails, subject=subject, attach_csv=attach_csv, @@ -158,7 +162,7 @@ def _parse_yaml(self) -> DAGConfig: doc_md=doc_md, dag_tags=set(dag_tags), owner=owner, - ) + ) def _get_terms_params(self, search) -> Tuple[List[str], str, str]: """Parses the `terms` config property handling different options.