diff --git a/src/viadot/orchestration/prefect/utils.py b/src/viadot/orchestration/prefect/utils.py index 2799beff5..b77c17deb 100644 --- a/src/viadot/orchestration/prefect/utils.py +++ b/src/viadot/orchestration/prefect/utils.py @@ -28,6 +28,443 @@ with contextlib.suppress(ModuleNotFoundError): from prefect_azure import AzureKeyVaultSecretReference +import re + +import pendulum + + +class DynamicDateHandler: + def __init__( + self, + dynamic_date_symbols: list[str] = ["<<", ">>"], # noqa: B006 + dynamic_date_format: str = "%Y%m%d", + dynamic_date_timezone: str = "Europe/Warsaw", + ): + """This class processes time-related patterns in the provided text. + + Replaces dynamic date markers with actual dates. + The supported patterns include: + - "today" + - "yesterday" + - "current_month" + - "last_month" + - "current_year" + - "last_year" + - "now_time" + - "last_day_previous_month" + - "last_day_of_month_year": e.g., "last_day_of_February_2020" + - "X_years_ago_year": e.g., "3_years_ago_year" ,refers to only the year + of the day X years ago + - "X_years/months/days_ago_full_date": e.g., "3_years_ago_full_date", + refers to a given date X units ago in dynamic_date_format + - "last_X_years/months/days": e.g., "last_10_months", refers to a data range + of the months in 'YMM' format + - "Y_years_from_X": e.g., "10_years_from_2020", refers to a data range + of the year numbers from a specified year + - "first_X_days_from_X": e.g., "first_10_days_of_January_2020", + returns a data range of days from a given month + + Args: + dynamic_date_symbols (list[str], optional): The symbols that mark + the start and the end of a dynamic date pattern in a text. + Defaults to ["<<", ">>"]. + dynamic_date_format (str, optional): A date and time format string + defining the text representation of date. Defaults to "%Y%m%d". + dynamic_date_timezone (str, optional): A string that sets the default + timezone used by all datetime functions. Defaults to "Europe/Warsaw". + """ + self.singular_patterns = { + "last_day_of_month": r"last_day_of_(\w+)_(\d{4})", + "x_units_ago_full_date": r"(\d+)_(years?|months?|days?)_ago_full_date", + "x_years_ago_year": r"(\d+)_years_ago_year", + } + self.range_patterns = { + "last_x_units": r"last_(\d+)_(years|months|days)", + "y_years_from_x": r"(\d+)_years_from_(\d{4})", + "first_x_days_from": r"first_(\d+)_days_from_(\w+)_(\d{4})", + "last_x_days_from": r"last_(\d+)_days_from_(\w+)_(\d{4})", + } + self.dynamic_date_format = dynamic_date_format + self.dynamic_date_timezone = dynamic_date_timezone + self.dynamic_date_symbols = dynamic_date_symbols + self.replacements = self._create_date_dict() + + def _generate_years( + self, last_years: int | None, from_year: str | None, num_years: str | None + ) -> list[str]: + """Generate a list of years either for the last X years or from a start year. + + Args: + last_years (int | None): The number of years to generate + from the current year. + from_year (str | None): The starting year. + num_years (int | None): The number of years to generate + from the starting year. + + Returns: + list: A list of years in ascending order. + """ + current_year = pendulum.now().year + if last_years: + return [str(current_year - i) for i in range(last_years)][ + ::-1 + ] # Reversed to ascending order + if from_year and num_years: + return [ + str(int(from_year) + i) for i in range(int(num_years)) + ] # Ascending order + + return [] + + def _generate_months(self, last_months: int) -> list[str]: + """Generate a list of first days of the last X months. + + Args: + last_months (int): The number of months to include from the past. + + Returns: + list: A list of dates representing the last X months in ascending order. + """ + current_date = pendulum.now() + + return [ + current_date.subtract(months=i).start_of("month").format("YMM") + for i in range(last_months) + ][::-1] # Reversed to ascending order + + def _generate_dates(self, last_days: int) -> list[str]: + """Generate a list of dates for the last X days. + + Args: + last_days (int): The number of days to include from the past. + + Returns: + list: A list of dates in ascending order. + """ + current_date = pendulum.now(self.dynamic_date_timezone) + + return [ + current_date.subtract(days=i).format("YMMDD") for i in range(last_days) + ][::-1] # Reversed to ascending order + + def _process_first_days( + self, month_name: str, year: int, num_days: int + ) -> list[str]: + """Generate a list of the first X days of a given month and year. + + Args: + month_name (str): The name of the month. + year (str): The year. + num_days (int): The number of days to include. + + Returns: + list: A list of dates for the first X days in ascending order. + """ + start_date = pendulum.datetime( + int(year), + pendulum.parse(month_name, strict=False).month, # type: ignore + 1, # type: ignore + ) + + return [ + start_date.add(days=i).format("YMMDD") for i in range(num_days) + ] # Ascending order + + def _process_last_days( + self, month_name: str, year: int, num_days: int + ) -> list[str]: + """Generate a list of the last X days of a given month and year. + + Args: + month_name (str): The name of the month. + year (str): The year. + num_days (int): The number of days to include. + + Returns: + list: A list of dates for the last X days in ascending order. + """ + start_date = pendulum.datetime( + int(year), + pendulum.parse(month_name, strict=False).month, # type: ignore + 1, # type: ignore + ) + end_date = start_date.end_of("month") + + return [end_date.subtract(days=i).format("YMMDD") for i in range(num_days)][ + ::-1 + ] # Reversed to ascending order + + def _process_last_day_of_month( + self, year: str, month_name: str + ) -> pendulum.DateTime: # type: ignore + """Retrieve the last day of a specified month and year. + + Args: + year (str): The year. + month_name (str): The name of the month. + + Returns: + pendulum.DateTime: A date object containing the last day of the given month. + """ + month_num = pendulum.parse(month_name, strict=False).month # type: ignore + + return pendulum.datetime(int(year), month_num, 1).end_of("month") + + def _process_x_years_ago(self, year: int) -> str: + """Retrieve the year of a date X years from now. + + Args: + year (int): The year. + + Returns: + str: A string containing the year of the specified time ago. + """ + current_date = pendulum.now() + + return current_date.subtract(years=year).format("Y") + + def _get_date_x_ago_full_date( + self, number: int, unit: str + ) -> pendulum.DateTime | None: # type: ignore + """Retrieve the full date for X units ago from today. + + Args: + number (int): The number of units (years, months, days). + unit (str): The unit of time ('years', 'months', 'days'). + + Returns: + pendulum.DateTime: A date for X units ago from today. + """ + return { + "years": pendulum.now(self.dynamic_date_timezone).subtract(years=number), + "months": pendulum.now(self.dynamic_date_timezone).subtract(months=number), + "days": pendulum.now(self.dynamic_date_timezone).subtract(days=number), + }.get(unit) + + def _create_date_dict(self) -> dict[str, str]: + """Create and return a key phrase: dynamic date value dictionary. + + Dictionary values "today", "yesterday" and "last_year_previous_month" are + formatted into the dynamic_date_format. + + The other values and their formatting: + - "current_month" - A string date formatted with a string format '%m'. + - "last_month" - A string date formatted with a format "%mm". + - "current_year" - A string date formatted with a format "%mm". + - "last_year" - A string date formatted with a string format '%Y' + - "now_time" - A string date formatted with a string format '%H%M%S'. + + Returns: + dict[str, str]: A dictionary with key phrases as keys + and dynamically created dates as values. + """ + today = pendulum.today(self.dynamic_date_timezone) + yesterday = pendulum.yesterday(self.dynamic_date_timezone) + last_month = today.subtract(months=1).month + last_year = today.subtract(years=1) + now_time = pendulum.now(self.dynamic_date_timezone) + last_day_prev_month = today.subtract(months=1).end_of("month") + + return { + "today": today.strftime(self.dynamic_date_format), + "yesterday": yesterday.strftime(self.dynamic_date_format), + "current_month": today.strftime("%m"), + "last_month": f"{last_month:02d}", + "current_year": today.strftime("%Y"), + "last_year": last_year.strftime("%Y"), + "now_time": now_time.strftime("%H%M%S"), + "last_day_previous_month": last_day_prev_month.strftime( + self.dynamic_date_format + ), + } + + def _handle_singular_dates( + self, dynamic_date_marker: str, match: list[tuple], key: str + ) -> pendulum.DateTime | str | None: # type: ignore + """Directs execution of a specific function based on the value of `key`. + + Args: + match (list[tuple]): list of every pattern match that occurs in a string. + key (str): Key phrase that determines the execution of a specific function. + + Returns: + str or pendulum.DateTime: + - If key == 'x_years_ago_year' returns string of a pendulum date formatted + with a pendulum token 'Y'. + - If key != 'x_years_ago_year' returns a pendulum.DateTime + """ + if key == "last_day_of_month": + for month_name, year in match: + replacement = self._process_last_day_of_month(year, month_name) + + elif key == "x_units_ago_full_date": + for x, units in match: + replacement = self._get_date_x_ago_full_date(int(x), units) + + elif key == "x_years_ago_year": + for x in match: + replacement = self._process_x_years_ago(int(x)) # type: ignore + else: + replacement = dynamic_date_marker + return replacement + + def _generate_dates_based_on_unit( + self, dynamic_date_marker: str, number: int, unit: str + ) -> list[str] | str: + """Direct execution of a specific function based on the value of `unit`. + + Possible values of `unit` correspond to different date formatting styles: + - 'years': Return a date formatted with a pendulum token 'Y'. + - 'months': Return a date formatted with a pendulum token 'YMM'. + - 'days': Return a date with a pendulum token 'YMMDD'. + + Args: + number (int): The number of units from the current year to include. + unit (str): The unit of time ('years', 'months', 'days'). + + Returns: + list[str] or string: + - If the key argument matches the given options, returns + a list of dynamically created dates generated based on `unit` + in an ascending order. + - If the key argument doesn't match the given options, + returns the unhandled dynamic_date_marker parameter + """ + if unit == "years": + return self._generate_years( + last_years=number, from_year=None, num_years=None + ) + if unit == "months": + return self._generate_months(last_months=number) + if unit == "days": + return self._generate_dates(last_days=number) + return dynamic_date_marker + + def _handle_data_ranges( + self, dynamic_date_marker: str, match_found: list[tuple], key: str + ) -> list[str] | str: + """Direct execution of a specific function based on the provided value of `key`. + + Depending on a unit ('years'/'months'/'days') the `match_found` refers to, + date formatting style differs: + - 'years': Return a date formatted with a pendulum token 'Y'. + - 'months': Return a date formatted with a pendulum token 'YMM'. + - 'days': Return a date with a pendulum token 'YMMDD'. + + Args: + dynamic_date_marker (str): A dynamic date marker that has been found in text + including the dynamic_date_symbols. + match_found (list[tuple]): list of every pattern match that occurs in + a string. + key (str): Key phrase that determines the execution of a specific function. + + Returns: + list[str] or string: + - If the key argument matches the given options, + returns list of extracted date ranges in ascending order. + - If the key argument doesn't match the given options, + returns the unhandled dynamic_date_marker parameter + """ + if key == "last_x_units": + for number, unit in match_found: + return self._generate_dates_based_on_unit( + dynamic_date_marker, int(number), unit + ) + + elif key == "y_years_from_x": + for number, start_year in match_found: + return self._generate_years( + last_years=None, + from_year=start_year, + num_years=int(number), # type: ignore + ) + + elif key == "first_x_days_from": + for num_days, month_name, year in match_found: + return self._process_first_days(month_name, year, int(num_days)) + + elif key == "last_x_days_from": + for num_days, month_name, year in match_found: + return self._process_last_days(month_name, year, int(num_days)) + + return dynamic_date_marker + + def _process_string(self, text: str) -> list[str] | str: + """Analyze and extract date ranges or singular dates from the given text. + + It bases on specific patterns or pendulum dates. + + Args: + text (str): A string containing various time-related patterns + to be analyzed. + + Returns: + list or string: + - If the input is a key phrase for a data range, + returns list of extracted date ranges in ascending order. + - If the input is a key phrase for a single date or a pendulum date, + returns the input text with an accurate date. + """ + start_symbol, end_symbol = self.dynamic_date_symbols + start, end = re.escape(start_symbol), re.escape(end_symbol) + pattern = rf"{start}.*?{end}" + + matches_between_symbols = re.findall(pattern, text, re.IGNORECASE) + if not matches_between_symbols: + return text + + for match in matches_between_symbols: + match_no_symbols = match[len(start_symbol) : -len(end_symbol)] + replacement = None + # Processing the singular dates + for key, pattern in self.singular_patterns.items(): + match_found = re.findall(pattern, match_no_symbols, re.IGNORECASE) + if match_found: + replacement = self._handle_singular_dates(match, match_found, key) + + # Process range date matches + for key, pattern in self.range_patterns.items(): + match_found = re.findall(pattern, match_no_symbols, re.IGNORECASE) + if match_found: + return self._handle_data_ranges(match, match_found, key) + + if match_no_symbols in self.replacements: + replacement = self.replacements[match_no_symbols] + if not replacement and bool( + re.match(r"^\s*pendulum\.\w+\(.*\)\s*$", match_no_symbols) + ): + replacement = eval(match_no_symbols) # noqa: S307 + text = text.replace( + match, + ( + replacement.strftime(self.dynamic_date_format) + if isinstance(replacement, pendulum.DateTime) # type: ignore + else replacement + ), + ) + + return text + + def process_dates(self, processed_input: str | list[str]) -> str | list[str]: + """Process an input by processing dates within it. + + If the input is a string, it applies the _process_string() function. + If the input is a list, it recursively processes each list item. + + Args: + processed_input (str or list): The segment to be processed. + + Returns: + str or list: + - If processed_input is a string, returns the processed string. + - If processed_input is a list, returns a list of processed strings. + """ + if isinstance(processed_input, str): + return self._process_string(processed_input) + if isinstance(processed_input, list): + return [self.process_dates(sub_segment) for sub_segment in processed_input] # type: ignore + return processed_input + async def list_block_documents() -> list[Any]: """Retrieve list of Prefect block documents.""" @@ -188,7 +625,7 @@ async def shell_run_command( Args: command: Shell command to be executed; can also be provided post-initialization by calling this task instance. - env: Dictionary of environment variables to use for + env: dictionary of environment variables to use for the subprocess; can also be provided at runtime. helper_command: String representing a shell command, which will be executed prior to the `command` in the same process. diff --git a/tests/unit/orchestration/prefect/test_utils.py b/tests/unit/orchestration/prefect/test_utils.py new file mode 100644 index 000000000..9f7c2678f --- /dev/null +++ b/tests/unit/orchestration/prefect/test_utils.py @@ -0,0 +1,146 @@ +import pendulum +import pytest + +from viadot.orchestration.prefect.utils import DynamicDateHandler + + +ddh1 = DynamicDateHandler( + ["<<", ">>"], dynamic_date_format="%Y%m%d", dynamic_date_timezone="Europe/Warsaw" +) + +ddh2 = DynamicDateHandler( + ["[[", "]]"], dynamic_date_format="%Y%m%d", dynamic_date_timezone="Europe/Warsaw" +) + + +@pytest.fixture +def setup_dates(): + """Fixture to provide the current dates for comparison in tests.""" + today = pendulum.today("Europe/Warsaw") + yesterday = pendulum.yesterday("Europe/Warsaw") + last_month = today.subtract(months=1).month + last_year = today.subtract(years=1) + last_day_prev_month = today.subtract(months=1).end_of("month") + now_time = pendulum.now("Europe/Warsaw") + return { + "today": today.strftime("%Y%m%d"), + "yesterday": yesterday.strftime("%Y%m%d"), + "current_month": today.strftime("%m"), + "last_month": f"{last_month:02d}", + "current_year": today.strftime("%Y"), + "last_year": last_year.strftime("%Y"), + "last_day_previous_month": last_day_prev_month.strftime("%Y%m%d"), + "now_time": now_time.strftime("%H%M%S"), + } + + +def test_create_dates(setup_dates): + """Test if create_dates function returns correct dictionary values.""" + keys_to_compare = [ + "today", + "yesterday", + "current_month", + "last_month", + "current_year", + "last_year", + "last_day_previous_month", + ] + assert all(setup_dates[key] == ddh1.replacements[key] for key in keys_to_compare) + + +def test_process_dates_single(setup_dates): + """Test if process_dates function replaces a single date placeholder.""" + text = "Today's date is <>." + replaced_text = ddh1.process_dates(text) + expected_text = f"Today's date is {setup_dates['today']}." + assert replaced_text == expected_text + + +def test_process_dates_multiple(setup_dates): + """Test if process_dates function replaces multiple placeholders.""" + text = "Today is <>, yesterday was <>" + replaced_text = ddh1.process_dates(text) + expected_text = ( + f"Today is {setup_dates['today']}, yesterday was {setup_dates['yesterday']}" + ) + assert replaced_text == expected_text + + +def test_process_dates_with_custom_symbols(setup_dates): + """Test if process_dates function works with custom start and end symbols.""" + text = "The year is [[current_year]]." + replaced_text = ddh2.process_dates(text) + expected_text = f"The year is {setup_dates['current_year']}." + assert replaced_text == expected_text + + +def test_process_dates_with_malformed_keys(): + """Test if process_dates function leaves malformed placeholders unchanged.""" + text = "This is a malformed placeholder: <>" + processed_range = ddh1.process_dates(text) + x_years_ago = pendulum.today().year - (x - 1) + expected_result = [str(x_years_ago + i) for i in range(x)] + assert processed_range == expected_result + + +def test_process_last_x_months(): + """Test if process_dates function returns a date range of last x years.""" + x = 18 + text = f"<>" + processed_range = ddh1.process_dates(text) + start_date = pendulum.today().subtract(months=(x - 1)) + expected_result = [ + start_date.add(months=i).start_of("month").format("YMM") for i in range(x) + ] + assert processed_range == expected_result + + +def test_process_y_years_from_x(): + """Test if `process_dates()` returns a range of years from a given start year.""" + x = 2019 + y = 4 + text = f"<<{y}_years_from_{x}>>" + processed_range = ddh1.process_dates(text) + expected_result = [str(x + i) for i in range(y)] + assert processed_range == expected_result + + +def test_process_string(): + """Test the `_process_string` function to verify it correctly processes dates.""" + result = ddh1._process_string("Let's meet <>.") + assert result == f"Let's meet {pendulum.today().strftime('%Y%m%d')}." + + result = ddh1._process_string( + "The event is scheduled for <>." + ) + assert ( + result + == f"The event is scheduled for {pendulum.tomorrow().strftime('%Y-%m-%d')}." + )