Skip to content

Commit

Permalink
add logic for subsearchs
Browse files Browse the repository at this point in the history
  • Loading branch information
edulauer committed May 13, 2024
1 parent 133d564 commit e926c56
Showing 1 changed file with 48 additions and 44 deletions.
92 changes: 48 additions & 44 deletions src/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,61 +92,65 @@ def _parse_yaml(self) -> DAGConfig:
"""Processes the config file in order to instantiate the DAG in
Airflow.
"""
with open(self.filepath, 'r') as file:
with open(self.filepath, "r") as file:
dag_config_dict = yaml.safe_load(file)

# Mandatory fields
dag = self._try_get(dag_config_dict, 'dag')
dag_id = self._try_get(dag, 'id')
description = self._try_get(dag, 'description')
report = self._try_get(dag, 'report')
search = self._try_get(dag, 'search')
terms, sql, conn_id = self._get_terms_params(search)
dag = self._try_get(dag_config_dict, "dag")
dag_id = self._try_get(dag, "id")
description = self._try_get(dag, "description")
report = self._try_get(dag, "report")
search = self._try_get(dag, "search")

search_dict = {}
if isinstance(search, dict):
for key, subsearch in search.items():
search_dict[key] = {}
search_dict[key]["sources"] = search.get("sources", ["DOU"])
(
search_dict[key]["terms"],
search_dict[key]["sql"],
search_dict[key]["conn"],
) = self._get_terms_params(subsearch)
search_dict[key]["territory_id"] = subsearch.get("territory_id", None)
search_dict[key]["dou_sections"] = subsearch.get(
"dou_sections", ["TODOS"]
)
search_dict[key]["search_date"] = subsearch.get("date", "DIA")
search_dict[key]["field"] = subsearch.get("field", "TUDO")
search_dict[key]["is_exact_search"] = subsearch.get(
"is_exact_search", True
)
search_dict[key]["ignore_signature_match"] = subsearch.get(
"ignore_signature_match", False
)
search_dict[key]["force_rematch"] = subsearch.get("force_rematch", None)
search_dict[key]["full_text"] = subsearch.get("full_text", None)
search_dict[key]["department"] = subsearch.get("department", None)

# Optional fields
owner = ", ".join(dag.get('owner', []))
sources = search.get('sources', ['DOU'])
discord_webhook = (report['discord']['webhook']
if report.get('discord') else None)
slack_webhook = (report['slack']['webhook']
if report.get('slack') else None)
territory_id = search.get('territory_id', None)
dou_sections = search.get('dou_sections', ['TODOS'])
search_date = search.get('date', 'DIA')
field = search.get('field', 'TUDO')
is_exact_search = search.get('is_exact_search', True)
ignore_signature_match = search.get('ignore_signature_match', False)
force_rematch = search.get('force_rematch', None)
full_text = search.get('full_text', None)
department = search.get('department', None)
owner = ", ".join(dag.get("owner", []))
discord_webhook = (
report["discord"]["webhook"] if report.get("discord") else None
)
slack_webhook = report["slack"]["webhook"] if report.get("slack") else None

schedule = self._get_safe_schedule(dag, self.DEFAULT_SCHEDULE)
doc_md = dag.get('doc_md', None)
doc_md = dag.get("doc_md", None)
if doc_md:
doc_md = textwrap.dedent(doc_md)
dag_tags = dag.get('tags', [])
dag_tags = dag.get("tags", [])
# add default tags
dag_tags.append('dou')
dag_tags.append('generated_dag')
skip_null = report.get('skip_null', True)
emails = report.get('emails')
subject = report.get('subject', 'Extraçao do DOU')
attach_csv = report.get('attach_csv', False)
dag_tags.append("dou")
dag_tags.append("generated_dag")
skip_null = report.get("skip_null", True)
emails = report.get("emails")
subject = report.get("subject", "Extraçao do DOU")
attach_csv = report.get("attach_csv", False)

return DAGConfig(
dag_id=dag_id,
sources=sources,
territory_id=territory_id,
dou_sections=dou_sections,
search_date=search_date,
field=field,
is_exact_search=is_exact_search,
ignore_signature_match=ignore_signature_match,
force_rematch=force_rematch,
full_text=full_text,
terms=terms,
sql=sql,
conn_id=conn_id,
department=department,
search_dict=search_dict,
emails=emails,
subject=subject,
attach_csv=attach_csv,
Expand All @@ -158,7 +162,7 @@ def _parse_yaml(self) -> DAGConfig:
doc_md=doc_md,
dag_tags=set(dag_tags),
owner=owner,
)
)

def _get_terms_params(self, search) -> Tuple[List[str], str, str]:
"""Parses the `terms` config property handling different options.
Expand Down

0 comments on commit e926c56

Please sign in to comment.