diff --git a/src/hooks/inlabs_hook.py b/src/hooks/inlabs_hook.py index 19c6b7b..aa6efee 100644 --- a/src/hooks/inlabs_hook.py +++ b/src/hooks/inlabs_hook.py @@ -4,13 +4,14 @@ import os import logging import re -from datetime import datetime +from datetime import datetime, timedelta import time import json from collections.abc import Iterator import unicodedata import math import requests +from requests import Response import html2text from airflow.hooks.base import BaseHook @@ -48,7 +49,6 @@ def search_text(self, search_terms: dict, ignore_signature_match: bool) -> dict: dict: A dictionary of processed search results. """ - headers = {"Content-Type": "application/json"} response = [] text_terms = search_terms["texto"] @@ -64,16 +64,20 @@ def search_text(self, search_terms: dict, ignore_signature_match: bool) -> dict: retries = 0 while retries < self.MAX_RETRIES: try: - r = requests.post( - self.DOU_API_URL, - headers=headers, - data=json.dumps(search_terms), - timeout=45, + r = self._request_api_data(search_terms) + r.raise_for_status() + response.extend(r.json()) + + r = self._request_api_data( + self._adapt_search_terms_to_extra(search_terms) ) r.raise_for_status() response.extend(r.json()) break - except (requests.exceptions.ReadTimeout, requests.exceptions.HTTPError) as e: + except ( + requests.exceptions.ReadTimeout, + requests.exceptions.HTTPError, + ) as e: retries += 1 logging.info( "Timeout occurred, retrying %s of %s...", @@ -95,19 +99,38 @@ def search_text(self, search_terms: dict, ignore_signature_match: bool) -> dict: @staticmethod def _iterate_in_chunks(lst: list, chunk_size: int) -> Iterator[list]: """Splits a list into chunks of a specified size. - - Args: - lst (list): The list to be chunked. - chunk_size (int): The size of each chunk. - - Yields: - Iterator[list]: A generator yielding chunks of - the original list. """ for i in range(0, len(lst), chunk_size): yield lst[i : i + chunk_size] + def _request_api_data(self, payload: dict) -> Response: + headers = {"Content-Type": "application/json"} + post_result = requests.post( + self.DOU_API_URL, + headers=headers, + data=json.dumps(payload), + timeout=45, + ) + + print(post_result.json()) + + return post_result + + @staticmethod + def _adapt_search_terms_to_extra(payload: dict) -> dict: + payload["pub_date"] = [ + (datetime.strptime(date, "%Y-%m-%d") - timedelta(days=1)).strftime( + "%Y-%m-%d" + ) + for date in payload["pub_date"] + ] + payload["pub_name"] = [ + s if s.endswith("E") else s + "E" for s in payload["pub_name"] + ] + + return payload + class TextDictHandler: """Handles the transformation and organization of text search results from the DOU API. @@ -149,6 +172,7 @@ def transform_search_results( "hierarchyList": None, } + #XXX check here matches = self._find_matches(item["abstract"], text_terms) if matches and item["title"]: