Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eurostat 2.0 #1027

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Added new version of `Eurostat` connector and test files.
- Added new version of `Genesys` connector and test files.
- Added new version of `Outlook` connector and test files.
- Added new version of `Hubspot` connector and test files.
Expand Down
3 changes: 2 additions & 1 deletion src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .sql_server_to_minio import sql_server_to_minio
from .transform import transform
from .transform_and_catalog import transform_and_catalog

from .eurostat_to_adls import eurostat_to_adls

__all__ = [
"cloud_for_customers_to_adls",
Expand All @@ -43,4 +43,5 @@
"sql_server_to_minio",
"transform",
"transform_and_catalog",
"eurostat_to_adls",
]
112 changes: 112 additions & 0 deletions src/viadot/orchestration/prefect/flows/eurostat_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
'eurostat_to_adls.py'.

Prefect flow for the Eurostat Cloud API connector.

This module provides a prefect flow function to use the Eurostat connector:
- Call to the prefect task wrapper to get a final Data Frame from the connector.
- Upload that data to Azure Data Lake Storage.

Typical usage example:

eurostat_to_adls(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
adls_path: str = None,
adls_credentials_secret: str = None,
overwrite_adls: bool = False,
adls_config_key: str = None,
)

Functions:

eurostat_to_adls(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
adls_path: str = None,
adls_credentials_secret: str = None,
overwrite_adls: bool = False,
adls_config_key: str = None,
):
Flow to download data from Eurostat Cloud API and upload to ADLS.
"""

from prefect import flow

from viadot.orchestration.prefect.tasks import df_to_adls, eurostat_to_df


@flow(
name="extract--eurostat--adls",
description="""Flow for downloading data from the Eurostat platform via
HTTPS REST API (no credentials required) to a CSV or Parquet file.
Then upload it to Azure Data Lake.""",
retries=1,
retry_delay_seconds=60,
)
def eurostat_to_adls(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
adls_path: str = None,
adls_credentials_secret: str = None,
overwrite_adls: bool = False,
adls_config_key: str = None,
) -> None:
"""
Flow for downloading data from Eurostat to Azure Data Lake.

Args:
name (str): The name of the flow.
dataset_code (str): The code of the Eurostat dataset to be uploaded.
params (Dict[str], optional):
A dictionary with optional URL parameters. The key represents the
parameter ID, while the value is the code for a specific parameter,
for example 'params = {'unit': 'EUR'}' where "unit" is the parameter
to set and "EUR" is the specific parameter code. You can add more
than one parameter, but only one code per parameter! So you CANNOT
provide a list of codes, e.g., 'params = {'unit': ['EUR', 'USD',
'PLN']}'. This parameter is REQUIRED in most cases to pull a specific
dataset from the API. Both the parameter and code must be provided
as a string! Defaults to None.
columns (List[str], optional): List of needed columns from the DataFrame
- acts as a filter. The data downloaded from Eurostat has the same
structure every time. The filter is applied after the data is
fetched. Defaults to None.
tests:
- `column_size`: dict{column: size}
- `column_unique_values`: list[columns]
- `column_list_to_match`: list[columns]
- `dataset_row_count`: dict: {'min': number, 'max': number}
- `column_match_regex`: dict: {column: 'regex'}
- `column_sum`: dict: {column: {'min': number, 'max': number}}
adls_dir_path (str, optional): Azure Data Lake destination folder/path.
Defaults to None.
adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault
secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data
Lake. Defaults to None.
overwrite_adls (bool, optional): Whether to overwrite files in the lake.
Defaults to False.
adls_config_key (str, optional): The key in the viadot config holding
relevant credentials. Defaults to None.
"""
df = eurostat_to_df(
dataset_code=dataset_code,
params=params,
columns=columns,
tests=tests,
)
adls = df_to_adls(
df=df,
path=adls_path,
credentials_secret=adls_credentials_secret,
config_key=adls_config_key,
overwrite=overwrite_adls,
)
return adls
3 changes: 2 additions & 1 deletion src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
sharepoint_to_df,
)
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df

from .eurostat import eurostat_to_df

__all__ = [
"adls_upload",
Expand All @@ -48,4 +48,5 @@
"create_sql_server_table",
"sql_server_query",
"sql_server_to_df",
"eurostat_to_df",
]
82 changes: 82 additions & 0 deletions src/viadot/orchestration/prefect/tasks/eurostat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
'eurostat.py'.

Prefect task wrapper for the Eurostat Cloud API connector.

This module provides an intermediate wrapper between the prefect flow and the connector:
- Generate the Eurostat Cloud API connector.
- Create and return a pandas Data Frame with the response of the API.

Typical usage example:

data_frame = eurostat_to_df(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
)

Functions:

eurostat_to_df(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
):
Task to download data from Eurostat Cloud API.

"""

from prefect import task

from viadot.sources import Eurostat


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60)
def eurostat_to_df(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
):
"""
Task for creating pandas data frame from Eurostat HTTPS REST API.

(no credentials required).

Args:
dataset_code (str): The code of eurostat dataset that we would like to upload.
params (Dict[str], optional):
A dictionary with optional URL parameters. The key represents the
parameter id, while the value is the code for a specific parameter,
for example 'params = {'unit': 'EUR'}' where "unit" is the parameter
that you would like to set and "EUR" is the code of the specific
parameter. You can add more than one parameter, but only one code per
parameter! So you CAN NOT provide list of codes as in example
'params = {'unit': ['EUR', 'USD', 'PLN']}' This parameter is REQUIRED
in most cases to pull a specific dataset from the API. Both parameter
and code has to provided as a string! Defaults to None.
base_url (str): The base URL used to access the Eurostat API. This parameter
specifies the root URL for all requests made to the API. It should not be
modified unless the API changes its URL scheme. Defaults to
"https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data/"
columns (List[str], optional): list of needed names of columns. Names should
be given as str's into the list. Defaults to None.
tests:
- `column_size`: dict{column: size}
- `column_unique_values`: list[columns]
- `column_list_to_match`: list[columns]
- `dataset_row_count`: dict: {'min': number, 'max', number}
- `column_match_regex`: dict: {column: 'regex'}
- `column_sum`: dict: {column: {'min': number, 'max': number}}

Returns:
pd.DataFrame: Pandas DataFrame.
"""
data_frame = Eurostat(dataset_code=dataset_code,
params=params,
columns=columns,
tests=tests).to_df()

return data_frame
3 changes: 2 additions & 1 deletion src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .sql_server import SQLServer
from .trino import Trino
from .uk_carbon_intensity import UKCarbonIntensity

from .eurostat import Eurostat

__all__ = [
"CloudForCustomers",
Expand All @@ -26,6 +26,7 @@
"Trino",
"SQLServer",
"UKCarbonIntensity",
"Eurostat",
]

if find_spec("adlfs"):
Expand Down
Loading
Loading