From 052084ca0daeb58611cad3f00633543cdc87e2f7 Mon Sep 17 00:00:00 2001 From: adrian-wojcik Date: Wed, 4 Sep 2024 09:59:14 +0200 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=9A=80=20Added=20`Eurostat`=20connect?= =?UTF-8?q?or=20with=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../orchestration/prefect/flows/__init__.py | 3 +- .../prefect/flows/eurostat_to_adls.py | 112 ++++++ .../orchestration/prefect/tasks/__init__.py | 3 +- .../orchestration/prefect/tasks/eurostat.py | 82 ++++ src/viadot/sources/__init__.py | 3 +- src/viadot/sources/eurostat.py | 356 ++++++++++++++++++ .../prefect/flows/test_eurostat_to_adls.py | 26 ++ .../prefect/tasks/test_eurostat_to_df.py | 30 ++ tests/unit/test_eurostat.py | 223 +++++++++++ tests/unit/test_eurostat_response.json | 43 +++ 10 files changed, 878 insertions(+), 3 deletions(-) create mode 100644 src/viadot/orchestration/prefect/flows/eurostat_to_adls.py create mode 100644 src/viadot/orchestration/prefect/tasks/eurostat.py create mode 100644 src/viadot/sources/eurostat.py create mode 100644 tests/integration/orchestration/prefect/flows/test_eurostat_to_adls.py create mode 100644 tests/integration/orchestration/prefect/tasks/test_eurostat_to_df.py create mode 100644 tests/unit/test_eurostat.py create mode 100644 tests/unit/test_eurostat_response.json diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index 9193fb2ed..885be2003 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -20,7 +20,7 @@ from .sql_server_to_minio import sql_server_to_minio from .transform import transform from .transform_and_catalog import transform_and_catalog - +from .eurostat_to_adls import eurostat_to_adls __all__ = [ "cloud_for_customers_to_adls", @@ -43,4 +43,5 @@ "sql_server_to_minio", "transform", "transform_and_catalog", + "eurostat_to_adls", ] diff --git a/src/viadot/orchestration/prefect/flows/eurostat_to_adls.py b/src/viadot/orchestration/prefect/flows/eurostat_to_adls.py new file mode 100644 index 000000000..0467aafd0 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/eurostat_to_adls.py @@ -0,0 +1,112 @@ +""" +'eurostat_to_adls.py'. + +Prefect flow for the Eurostat Cloud API connector. + +This module provides a prefect flow function to use the Eurostat connector: +- Call to the prefect task wrapper to get a final Data Frame from the connector. +- Upload that data to Azure Data Lake Storage. + +Typical usage example: + + eurostat_to_adls( + dataset_code: str, + params: dict = None, + columns: list = None, + tests: dict = None, + adls_path: str = None, + adls_credentials_secret: str = None, + overwrite_adls: bool = False, + adls_config_key: str = None, + ) + +Functions: + + eurostat_to_adls( + dataset_code: str, + params: dict = None, + columns: list = None, + tests: dict = None, + adls_path: str = None, + adls_credentials_secret: str = None, + overwrite_adls: bool = False, + adls_config_key: str = None, + ): + Flow to download data from Eurostat Cloud API and upload to ADLS. +""" + +from prefect import flow + +from viadot.orchestration.prefect.tasks import df_to_adls, eurostat_to_df + + +@flow( + name="extract--eurostat--adls", + description="""Flow for downloading data from the Eurostat platform via + HTTPS REST API (no credentials required) to a CSV or Parquet file. + Then upload it to Azure Data Lake.""", + retries=1, + retry_delay_seconds=60, +) +def eurostat_to_adls( + dataset_code: str, + params: dict = None, + columns: list = None, + tests: dict = None, + adls_path: str = None, + adls_credentials_secret: str = None, + overwrite_adls: bool = False, + adls_config_key: str = None, +) -> None: + """ + Flow for downloading data from Eurostat to Azure Data Lake. + + Args: + name (str): The name of the flow. + dataset_code (str): The code of the Eurostat dataset to be uploaded. + params (Dict[str], optional): + A dictionary with optional URL parameters. The key represents the + parameter ID, while the value is the code for a specific parameter, + for example 'params = {'unit': 'EUR'}' where "unit" is the parameter + to set and "EUR" is the specific parameter code. You can add more + than one parameter, but only one code per parameter! So you CANNOT + provide a list of codes, e.g., 'params = {'unit': ['EUR', 'USD', + 'PLN']}'. This parameter is REQUIRED in most cases to pull a specific + dataset from the API. Both the parameter and code must be provided + as a string! Defaults to None. + columns (List[str], optional): List of needed columns from the DataFrame + - acts as a filter. The data downloaded from Eurostat has the same + structure every time. The filter is applied after the data is + fetched. Defaults to None. + tests: + - `column_size`: dict{column: size} + - `column_unique_values`: list[columns] + - `column_list_to_match`: list[columns] + - `dataset_row_count`: dict: {'min': number, 'max': number} + - `column_match_regex`: dict: {column: 'regex'} + - `column_sum`: dict: {column: {'min': number, 'max': number}} + adls_dir_path (str, optional): Azure Data Lake destination folder/path. + Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault + secret containing a dictionary with ACCOUNT_NAME and Service Principal + credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data + Lake. Defaults to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. + Defaults to False. + adls_config_key (str, optional): The key in the viadot config holding + relevant credentials. Defaults to None. + """ + df = eurostat_to_df( + dataset_code=dataset_code, + params=params, + columns=columns, + tests=tests, + ) + adls = df_to_adls( + df=df, + path=adls_path, + credentials_secret=adls_credentials_secret, + config_key=adls_config_key, + overwrite=overwrite_adls, + ) + return adls diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index cbe8f7276..2aa3d08b0 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -22,7 +22,7 @@ sharepoint_to_df, ) from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df - +from .eurostat import eurostat_to_df __all__ = [ "adls_upload", @@ -48,4 +48,5 @@ "create_sql_server_table", "sql_server_query", "sql_server_to_df", + "eurostat_to_df", ] diff --git a/src/viadot/orchestration/prefect/tasks/eurostat.py b/src/viadot/orchestration/prefect/tasks/eurostat.py new file mode 100644 index 000000000..4f90a13e6 --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/eurostat.py @@ -0,0 +1,82 @@ +""" +'eurostat.py'. + +Prefect task wrapper for the Eurostat Cloud API connector. + +This module provides an intermediate wrapper between the prefect flow and the connector: +- Generate the Eurostat Cloud API connector. +- Create and return a pandas Data Frame with the response of the API. + +Typical usage example: + + data_frame = eurostat_to_df( + dataset_code: str, + params: dict = None, + columns: list = None, + tests: dict = None, + ) + +Functions: + + eurostat_to_df( + dataset_code: str, + params: dict = None, + columns: list = None, + tests: dict = None, + ): + Task to download data from Eurostat Cloud API. + +""" + +from prefect import task + +from viadot.sources import Eurostat + + +@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60) +def eurostat_to_df( + dataset_code: str, + params: dict = None, + columns: list = None, + tests: dict = None, +): + """ + Task for creating pandas data frame from Eurostat HTTPS REST API. + + (no credentials required). + + Args: + dataset_code (str): The code of eurostat dataset that we would like to upload. + params (Dict[str], optional): + A dictionary with optional URL parameters. The key represents the + parameter id, while the value is the code for a specific parameter, + for example 'params = {'unit': 'EUR'}' where "unit" is the parameter + that you would like to set and "EUR" is the code of the specific + parameter. You can add more than one parameter, but only one code per + parameter! So you CAN NOT provide list of codes as in example + 'params = {'unit': ['EUR', 'USD', 'PLN']}' This parameter is REQUIRED + in most cases to pull a specific dataset from the API. Both parameter + and code has to provided as a string! Defaults to None. + base_url (str): The base URL used to access the Eurostat API. This parameter + specifies the root URL for all requests made to the API. It should not be + modified unless the API changes its URL scheme. Defaults to + "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data/" + columns (List[str], optional): list of needed names of columns. Names should + be given as str's into the list. Defaults to None. + tests: + - `column_size`: dict{column: size} + - `column_unique_values`: list[columns] + - `column_list_to_match`: list[columns] + - `dataset_row_count`: dict: {'min': number, 'max', number} + - `column_match_regex`: dict: {column: 'regex'} + - `column_sum`: dict: {column: {'min': number, 'max': number}} + + Returns: + pd.DataFrame: Pandas DataFrame. + """ + data_frame = Eurostat(dataset_code=dataset_code, + params=params, + columns=columns, + tests=tests).to_df() + + return data_frame \ No newline at end of file diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 0d77c1bf2..7183ec32a 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -13,7 +13,7 @@ from .sql_server import SQLServer from .trino import Trino from .uk_carbon_intensity import UKCarbonIntensity - +from .eurostat import Eurostat __all__ = [ "CloudForCustomers", @@ -26,6 +26,7 @@ "Trino", "SQLServer", "UKCarbonIntensity", + "Eurostat", ] if find_spec("adlfs"): diff --git a/src/viadot/sources/eurostat.py b/src/viadot/sources/eurostat.py new file mode 100644 index 000000000..f54f0828d --- /dev/null +++ b/src/viadot/sources/eurostat.py @@ -0,0 +1,356 @@ +""" +'eurostat.py'. + +Structure for the Eurostat API connector. + +This module provides functionalities for connecting to Eurostat API and download +the datasets. It includes the following features: +- Pulling json file with all data from specific dataset. +- Creating pandas Data Frame from pulled json file. +- Creating dataset parameters validation if specified. + +Typical usage example: + + eurostat = Eurostat() + + eurostat.to_df( + dataset_code: str, + params: dict = None, + columns: list = None, + tests: dict = None, + ) + +Functions: + + get_parameters_codes(dataset_code: str, url: str): Validate available API request + parameters and their codes. + validate_params(dataset_code: str, url: str, params: dict): Validates given + parameters against the available parameters in the dataset + eurostat_dictionary_to_df(*signals: list): Function for creating DataFrame from + JSON pulled from Eurostat + to_df(dataset_code: str, params: dict = None, columns: list = None, + tests: dict = None): Function responsible for getting response and creating + DataFrame using method 'eurostat_dictionary_to_df' with validation of provided + parameters and their codes if needed. + +""" + +import pandas as pd + +from viadot.utils import ( + APIError, + add_viadot_metadata_columns, + filter_df_columns, + handle_api_response, + validate, +) +from .base import Source + + +class Eurostat(Source): + """Class for creating instance of Eurostat connector to REST API by HTTPS response. + + (no credentials required). + """ + + base_url = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data/" + + def __init__( + self, + *args, + dataset_code: str, + params: dict = None, + columns: list = None, + tests: dict = None, + **kwargs + ): + """It is using HTTPS REST request to pull the data. + + No API registration or API key are required. Data will pull based on parameters + provided in dynamic part of the url. + + Example of url: https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/ + data/TEIBS020/?format=JSON&lang=EN&indic=BS-CSMCI-BAL + + Static part: https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data + Dynamic part: /TEIBS020/?format=JSON&lang=EN&indic=BS-CSMCI-BAL + + Please note that for one dataset there are usually multiple data regarding + different subjects. In order to retrieve data that you are interested in you + have to provide parameters with codes into 'params'. + + Args: + base_url (str): The base URL used to access the Eurostat API. + This parameter specifies the root URL for all requests made to the API. + It should not be modified unless the API changes its URL scheme. + Defaults to "https://ec.europa.eu/eurostat/api/dissemination/statistics + /1.0/data/" + Args: + dataset_code (str): The code of Eurostat dataset that we would like + to upload. + params (Dict[str], optional): + A dictionary with optional URL parameters. The key represents the + parameter id, while the value is the code for a specific parameter, + for example: params = {'unit': 'EUR'} where "unit" is the parameter + that you would like to set and "EUR" is the code of the + specific parameter. You can add more than one parameter, + but only one code per parameter! So you CAN NOT provide list of codes + as in example 'params = {'unit': ['EUR', 'USD', 'PLN']}' + These parameters are REQUIRED in most cases to pull a specific + dataset from the API. Both parameter and code has to be provided + as a string! Defaults to None. + columns (List[str], optional): list of needed names of columns. + Names should be given as str's into the list. Defaults to None. + tests: + - `column_size`: dict{column: size} + - `column_unique_values`: list[columns] + - `column_list_to_match`: list[columns] + - `dataset_row_count`: dict: {'min': number, 'max', number} + - `column_match_regex`: dict: {column: 'regex'} + - `column_sum`: dict: {column: {'min': number, 'max': number}} + """ + self.dataset_code = dataset_code + self.params = params + self.columns = columns + self.tests = tests + + super().__init__(*args, **kwargs) + + def get_parameters_codes(self, dataset_code: str, url: str) -> dict: + """Validate available API request parameters and their codes. + + Raises: + ValueError: If the response from the API is empty or invalid. + + Returns: + Dict: Key is parameter and value is a list of available codes for + specific parameter. + """ + try: + response = handle_api_response(url) + data = response.json() + except APIError as api_error: + self.logger.error( + f"Failed to fetch data for {dataset_code}, please check correctness " + "of dataset code!" + ) + raise ValueError("DataFrame is empty!") from api_error + + # Getting list of available parameters + available_params = data["id"] + + # Dictionary from JSON with keys and related codes values + dimension = data["dimension"] + + # Assigning list of available codes to specific parameters + params_and_codes = {} + for key in available_params: + if key in dimension: + codes = list(dimension[key]["category"]["index"].keys()) + params_and_codes[key] = codes + return params_and_codes + + def validate_params(self, dataset_code: str, url: str, params: dict): + """Validates given parameters against the available parameters in the dataset. + + Important: + Each dataset in eurostat has specific parameters that could be used for + filtering the data. For example dataset ILC_DI04 -Mean and median income by + household type - EU-SILC and ECHP surveys has parameter such as: + hhhtyp (Type of household), which can be filtered by specific available + code of this parameter, such as: 'total', 'a1' (single person), + 'a1_dhc' (single person with dependent children). Please note that each + dataset has different parameters and different codes + + Raises: + ValueError: If any of the self.params keys or values is not a string or + any of them is not available for specific dataset. + """ + # In order to make params validation, first we need to get params_and_codes. + key_codes = self.get_parameters_codes(dataset_code=dataset_code, url=url) + + # Validation of type of values + for key, val in params.items(): + if not isinstance(key, str) or not isinstance(val, str): + self.logger.error( + "You can provide only one code per one parameter as 'str' " + "in params! " + "CORRECT: params = {'unit': 'EUR'} | INCORRECT: params = " + "{'unit': ['EUR', 'USD', 'PLN']}" + ) + raise ValueError("Wrong structure of params!") + + if key_codes is not None: + # Conversion keys and values on lower cases by using casefold + key_codes_after_conversion = { + k.casefold(): [v_elem.casefold() for v_elem in v] + for k, v in key_codes.items() + } + params_after_conversion = { + k.casefold(): v.casefold() for k, v in params.items() + } + + # Comparing keys and values + non_available_keys = [ + key + for key in params_after_conversion.keys() + if key not in key_codes_after_conversion + ] + non_available_codes = [ + value + for key, value in params_after_conversion.items() + if key in key_codes_after_conversion.keys() + and value not in key_codes_after_conversion[key] + ] + + # Error loggers + + if non_available_keys: + self.logger.error( + f"Parameters: '{' | '.join(non_available_keys)}' are not in " + "dataset. Please check your spelling!" + ) + + self.logger.error( + f"Possible parameters: {' | '.join(key_codes.keys())}" + ) + if non_available_codes: + self.logger.error( + f"Parameters codes: '{' | '.join(non_available_codes)}' are not " + "available. Please check your spelling!\n" + ) + self.logger.error( + "You can find everything via link: https://ec.europa.eu/" + f"eurostat/databrowser/view/{dataset_code}/default/table?lang=en" + ) + + if non_available_keys or non_available_codes: + raise ValueError("Wrong parameters or codes were provided!") + + def eurostat_dictionary_to_df(self, *signals: list) -> pd.DataFrame: + """Function for creating DataFrame from JSON pulled from Eurostat. + + Returns: + pd.DataFrame: Pandas DataFrame With 4 columns: index, geo, time, indicator + """ + + class TSignal: + """Class representing a signal with keys, indexes, labels, and name.""" + + signal_keys_list: list + signal_index_list: list + signal_label_list: list + signal_name: str + + # Dataframe creation + columns0 = signals[0].copy() + columns0.append("indicator") + df = pd.DataFrame(columns=columns0) + indicator_list = [] + index_list = [] + signal_lists = [] + + # Get the dictionary from the inputs + eurostat_dictionary = signals[-1] + + for signal in signals[0]: + signal_struct = TSignal() + signal_struct.signal_name = signal + signal_struct.signal_keys_list = list( + eurostat_dictionary["dimension"][signal]["category"]["index"].keys() + ) + signal_struct.signal_index_list = list( + eurostat_dictionary["dimension"][signal]["category"]["index"].values() + ) + signal_label_dict = eurostat_dictionary["dimension"][signal]["category"][ + "label" + ] + signal_struct.signal_label_list = [ + signal_label_dict[i] for i in signal_struct.signal_keys_list + ] + signal_lists.append(signal_struct) + + col_signal_temp = [] + row_signal_temp = [] + for row_index, row_label in zip( + signal_lists[0].signal_index_list, signal_lists[0].signal_label_list + ): # rows + for col_index, col_label in zip( + signal_lists[1].signal_index_list, signal_lists[1].signal_label_list + ): # cols + index = str( + col_index + row_index * len(signal_lists[1].signal_label_list) + ) + if index in eurostat_dictionary["value"].keys(): + index_list.append(index) + col_signal_temp.append(col_label) + row_signal_temp.append(row_label) + + indicator_list = [eurostat_dictionary["value"][i] for i in index_list] + df.indicator = indicator_list + df[signal_lists[1].signal_name] = col_signal_temp + df[signal_lists[0].signal_name] = row_signal_temp + + return df + + @add_viadot_metadata_columns + def to_df( + self, + if_empty="warn" + ) -> pd.DataFrame: + """Function responsible for getting response and creating DataFrame. + + It is using method 'eurostat_dictionary_to_df' with validation + of provided parameters and their codes if needed. + + Raises: + TypeError: If self.params is different type than a dictionary. + TypeError: If self.columns is different type than a list containing strings. + APIError: If there is an error with the API request. + + Returns: + pd.DataFrame: Pandas DataFrame. + """ + # Checking if params and columns have correct type + if not isinstance(self.params, dict) and self.params is not None: + raise TypeError("Params should be a dictionary.") + + if not isinstance(self.columns, list) and self.columns is not None: + raise TypeError("Requested columns should be provided as list of strings.") + + # Creating url for connection with API + url = f"{self.base_url}{self.dataset_code}?format=JSON&lang=EN" + + # Making parameters validation + if self.params is not None: + self.validate_params(dataset_code=self.dataset_code, + url=url, + params=self.params) + + # Getting response from API + try: + response = handle_api_response(url, params=self.params) + data = response.json() + data_frame = self.eurostat_dictionary_to_df(["geo", "time"], data) + except APIError: + self.validate_params(dataset_code=self.dataset_code, + url=url, + params=self.params) + + # Merge data_frame with label and last updated date + label_col = pd.Series(str(data["label"]), index=data_frame.index, name="label") + last_updated__col = pd.Series( + str(data["updated"]), + index=data_frame.index, + name="updated", + ) + data_frame = pd.concat([data_frame, label_col, last_updated__col], axis=1) + + # Validation and transformation of requested column + if self.columns is not None: + filter_df_columns(data_frame=data_frame, columns=self.columns) + + # Additional validation from utils + validate(df=data_frame, tests=self.tests) + + return data_frame diff --git a/tests/integration/orchestration/prefect/flows/test_eurostat_to_adls.py b/tests/integration/orchestration/prefect/flows/test_eurostat_to_adls.py new file mode 100644 index 000000000..e4f633c8b --- /dev/null +++ b/tests/integration/orchestration/prefect/flows/test_eurostat_to_adls.py @@ -0,0 +1,26 @@ +"""Test flows `test_eurostat_to_adls`.""" + +from viadot.orchestration.prefect.flows import ( + eurostat_to_adls, +) +from viadot.sources import AzureDataLake + +TEST_FILE_PATH = "raw/viadot_2_0_TEST_eurostat.parquet" + + +def test_eurostat_to_adls(): + """Function for testing uploading data from Eurostat to ADLS.""" + lake = AzureDataLake(config_key="adls_test") + + assert not lake.exists(TEST_FILE_PATH) + + eurostat_to_adls( + dataset_code="ILC_DI04", + adls_path=TEST_FILE_PATH, + adls_credentials_secret="sp-adls-test", + ) + + assert lake.exists(TEST_FILE_PATH) + + lake.rm(TEST_FILE_PATH) + diff --git a/tests/integration/orchestration/prefect/tasks/test_eurostat_to_df.py b/tests/integration/orchestration/prefect/tasks/test_eurostat_to_df.py new file mode 100644 index 000000000..eddc54145 --- /dev/null +++ b/tests/integration/orchestration/prefect/tasks/test_eurostat_to_df.py @@ -0,0 +1,30 @@ +"""Test task `eurostat_to_df`.""" + +import pandas as pd +from prefect import flow +from viadot.orchestration.prefect.tasks import eurostat_to_df + + +def test_task_connexion(): + """Function for testing eurstat connection.""" + + @flow + def test_eurostat_to_df(): + """Function for testing eurostat_to_df task.""" + df = eurostat_to_df( + dataset_code="ILC_DI04", + params={"hhtyp": "total", "indic_il": "med_e"}, + ) + assert isinstance(df, pd.DataFrame) + assert not df.empty + assert list(df.columns) == [ + "geo", + "time", + "indicator", + "label", + "updated", + "_viadot_source", + "_viadot_downloaded_at_utc", + ] + + test_eurostat_to_df() diff --git a/tests/unit/test_eurostat.py b/tests/unit/test_eurostat.py new file mode 100644 index 000000000..9624a1448 --- /dev/null +++ b/tests/unit/test_eurostat.py @@ -0,0 +1,223 @@ +"""'test_eurostat.py'.""" + +import json +import logging + +import pandas as pd +import pytest +from viadot.sources import Eurostat + + +class EurostatMock(Eurostat): + """Mock of Eurostat source class.""" + + def __init__(self, dataset_code=None): + # Inicjalizacja bez dataset_code + super().__init__(dataset_code=dataset_code) + + def _download_json(self): + with open("test_eurostat_response.json") as file: + data = json.load(file) + return data + + +def test_eurostat_dictionary_to_df(): + """Test eurostat_dictionary_to_df method from source class.""" + eurostat = EurostatMock(dataset_code="") # Możesz przekazać pusty string lub None + data = eurostat._download_json() + + result_df = eurostat.eurostat_dictionary_to_df(["geo", "time"], data) + + assert list(result_df.columns) == ["geo", "time", "indicator"] + + expected_years = ["2020", "2021", "2022"] + assert result_df["time"].unique().tolist() == expected_years + + expected_geo = ["Germany", "France", "Italy"] + assert result_df["geo"].unique().tolist() == expected_geo + + expected_indicator = [ + 100, + 150, + 200, + 110, + 160, + 210, + 120, + 170, + 220, + ] + assert result_df["indicator"].unique().tolist() == expected_indicator + +URL = ( + "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0" + "/data/ILC_DI04?format=JSON&lang=EN" +) + + +def test_wrong_dataset_code_logger(caplog): + """Tests that the error logging feature correctly. + + Identifies and logs errors for incorrect dataset codes. + """ + task = Eurostat(dataset_code="ILC_DI04E") + + with pytest.raises(ValueError, match="DataFrame is empty!"): + with caplog.at_level(logging.ERROR): + task.to_df() + assert ( + "Failed to fetch data for ILC_DI04E, please check correctness of dataset code!" + in caplog.text + ) + + +def test_and_validate_dataset_code_without_params(caplog): + """Tests that the data retrieval feature returns a non-empty DataFrame. + + For a valid dataset code + """ + task = Eurostat(dataset_code="ILC_DI04").to_df() + + assert isinstance(task, pd.DataFrame) + assert not task.empty + assert caplog.text == "" + + +def test_wrong_parameters_codes_logger(caplog): + """Tests error logging for incorrect parameter codes with a correct dataset code.""" + params = {"hhtyp": "total1", "indic_il": "non_existing_code"} + dataset_code = "ILC_DI04" + + with pytest.raises(ValueError, match="Wrong parameters or codes were provided!"): + with caplog.at_level(logging.ERROR): + Eurostat(dataset_code=dataset_code).validate_params( + dataset_code=dataset_code, url=URL, params=params + ) + assert ( + "Parameters codes: 'total1 | non_existing_code' are not available. " + "Please check your spelling!" in caplog.text + ) + assert ( + "You can find everything via link: " + "https://ec.europa.eu/eurostat/databrowser/view/ILC_DI04/default/table?lang=en" + in caplog.text + ) + + +def test_parameter_codes_as_list_logger(caplog): + """Tests error logging for incorrect parameter codes structure. + + And with a correct dataset code. + """ + dataset_code = "ILC_DI04" + params = {"hhtyp": ["totale", "nottotale"], "indic_il": "med_e"} + + with pytest.raises(ValueError, match="Wrong structure of params!"): + with caplog.at_level(logging.ERROR): + Eurostat(dataset_code=dataset_code).validate_params( + dataset_code=dataset_code, url=URL, params=params + ) + assert ( + "You can provide only one code per one parameter as 'str' in params! " + "CORRECT: params = {'unit': 'EUR'} | INCORRECT: params = " + "{'unit': ['EUR', 'USD', 'PLN']}" in caplog.text + ) + + +def test_wrong_parameters(caplog): + """Tests error logging for incorrect parameter keys. + + And with a correct dataset code + """ + dataset_code = "ILC_DI04" + params = {"hhhtyp": "total", "indic_ilx": "med_e"} + + with pytest.raises(ValueError, match="Wrong parameters or codes were provided!"): + with caplog.at_level(logging.ERROR): + Eurostat(dataset_code=dataset_code).validate_params( + dataset_code=dataset_code, url=URL, params=params + ) + assert ( + "Parameters: 'hhhtyp | indic_ilx' are not in dataset. " + "Please check your spelling!" in caplog.text + ) + assert ( + "Possible parameters: freq | hhtyp | indic_il | unit | geo | time" + in caplog.text + ) + + +def test_correct_params_and_dataset_code(caplog): + """Tests that the data retrieval feature returns a non-empty DataFrame. + + For a valid dataset code with correct parameters + """ + task = Eurostat(dataset_code="ILC_DI04", + params={"hhtyp": "total", "indic_il": "med_e"}).to_df() + + assert isinstance(task, pd.DataFrame) + assert not task.empty + assert caplog.text == "" + + +def test_wrong_needed_columns_names(caplog): + """Tests error logging for incorrect names of requested columns. + + And with a correct dataset code and parameters. + """ + task = Eurostat(dataset_code="ILC_DI04", + params={"hhtyp": "total", "indic_il": "med_e"}, + columns=["updated1", "geo1", "indicator1"],) + + with pytest.raises(ValueError, match="Provided columns are not available!"): + with caplog.at_level(logging.ERROR): + task.to_df() + assert ( + "Name of the columns: 'updated1 | geo1 | indicator1' are not in DataFrame. " + "Please check spelling!" in caplog.text + ) + assert "Available columns: geo | time | indicator | label | updated" in caplog.text + + +def test_wrong_params_and_wrong_requested_columns_names(caplog): + """Tests error logging for incorrect parameters and names. + + And requested columns with a correct dataset code. + """ + task = Eurostat(dataset_code="ILC_DI04", + params={"hhhtyp": "total", "indic_ilx": "med_e"}, + columns=["updated1", "geo1", "indicator1"],) + + with pytest.raises(ValueError, match="Wrong parameters or codes were provided!"): + with caplog.at_level(logging.ERROR): + task.to_df() + assert ( + "Parameters: 'hhhtyp | indic_ilx' are not in dataset. " + "Please check your spelling!" in caplog.text + ) + assert ( + "Possible parameters: freq | hhtyp | indic_il | unit | geo | time" + in caplog.text + ) + + +def test_requested_columns_not_in_list(): + """Tests error logging for incorrect requested columns structure. + + With a correct dataset code and parameters. + """ + with pytest.raises( + TypeError, match="Requested columns should be provided as list of strings." + ): + Eurostat(dataset_code="ILC_DI04", + params={"hhtyp": "total", "indic_il": "med_e"}, + columns="updated").to_df() + + +def test_params_as_list(): + """Tests error logging for incorrect parameter structure. + + With a correct dataset code + """ + with pytest.raises(TypeError, match="Params should be a dictionary."): + Eurostat(dataset_code="ILC_DI04", params=["total", "med_e"]).to_df() \ No newline at end of file diff --git a/tests/unit/test_eurostat_response.json b/tests/unit/test_eurostat_response.json new file mode 100644 index 000000000..6a403b157 --- /dev/null +++ b/tests/unit/test_eurostat_response.json @@ -0,0 +1,43 @@ +{ + "dimension": { + "geo": { + "category": { + "index": { + "DE": 0, + "FR": 1, + "IT": 2 + }, + "label": { + "DE": "Germany", + "FR": "France", + "IT": "Italy" + } + } + }, + "time": { + "category": { + "index": { + "2020": 0, + "2021": 1, + "2022": 2 + }, + "label": { + "2020": "2020", + "2021": "2021", + "2022": "2022" + } + } + } + }, + "value": { + "0": 100, + "1": 150, + "2": 200, + "3": 110, + "4": 160, + "5": 210, + "6": 120, + "7": 170, + "8": 220 + } +} From ec0268a110ea1962e412b202262c62c73e692815 Mon Sep 17 00:00:00 2001 From: adrian-wojcik Date: Wed, 4 Sep 2024 10:02:49 +0200 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=9A=80=20Adding=20entry=20in=20change?= =?UTF-8?q?log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e70f5016c..ac76cc80b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Added new version of `Eurostat` connector and test files. - Added new version of `Genesys` connector and test files. - Added new version of `Outlook` connector and test files. - Added new version of `Hubspot` connector and test files.