From 849247717f96afede943f01a20a881b91210c7a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=B3jcik?= <107313911+adrian-wojcik@users.noreply.github.com> Date: Fri, 4 Oct 2024 10:28:22 +0200 Subject: [PATCH 1/3] =?UTF-8?q?=E2=9C=A8=20Add=20`VidClub`=20source=20and?= =?UTF-8?q?=20Prefect=20tasks=20(#1044)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🚀 Add Vid Club connector with tests * removed prefect dependency from source * super init passing credentials * 🎨 Changed `source` to `endpoint` * 🎨 Change imports structure * 🐛 Fix import bug * ♻️ Refactor credential passing method with returning data frame on task level * 🎨 Improved code structure for VidClub source * 🐛 Modified building `url` * ✅ Improved tests code structure * 🎨 Modified code structure * 🎨 Modified code structure * 🎨 Modified code structure * 🎨 Moved description to the new line * 🎨 Added `# pragma: allowlist secret` * 🎨 Modified loggers and added `if_empty` param * 🔥 Removed logging --------- Co-authored-by: adrian-wojcik Co-authored-by: fdelgadodyvenia Co-authored-by: rziemianek Co-authored-by: Rafał Ziemianek <49795849+Rafalz13@users.noreply.github.com> --- .../orchestration/prefect/flows/__init__.py | 2 + .../prefect/flows/vid_club_to_adls.py | 98 +++++ .../orchestration/prefect/tasks/__init__.py | 2 + .../orchestration/prefect/tasks/vid_club.py | 79 ++++ src/viadot/sources/__init__.py | 2 + src/viadot/sources/vid_club.py | 390 ++++++++++++++++++ .../prefect/flows/test_vid_club.py | 31 ++ .../prefect/tasks/test_vid_club.py | 52 +++ tests/unit/test_vid_club.py | 60 +++ 9 files changed, 716 insertions(+) create mode 100644 src/viadot/orchestration/prefect/flows/vid_club_to_adls.py create mode 100644 src/viadot/orchestration/prefect/tasks/vid_club.py create mode 100644 src/viadot/sources/vid_club.py create mode 100644 tests/integration/orchestration/prefect/flows/test_vid_club.py create mode 100644 tests/integration/orchestration/prefect/tasks/test_vid_club.py create mode 100644 tests/unit/test_vid_club.py diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index acc981caf..a2d489ab9 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -30,6 +30,7 @@ from .supermetrics_to_adls import supermetrics_to_adls from .transform import transform from .transform_and_catalog import transform_and_catalog +from .vid_club_to_adls import vid_club_to_adls __all__ = [ @@ -63,4 +64,5 @@ "supermetrics_to_adls", "transform", "transform_and_catalog", + "vid_club_to_adls", ] diff --git a/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py new file mode 100644 index 000000000..8ba05ab25 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/vid_club_to_adls.py @@ -0,0 +1,98 @@ +"""Download data from Vid CLub API and load it into Azure Data Lake Storage.""" + +from typing import Any, Literal + +from prefect import flow +from prefect.task_runners import ConcurrentTaskRunner + +from viadot.orchestration.prefect.tasks import df_to_adls, vid_club_to_df + + +@flow( + name="Vid CLub extraction to ADLS", + description="Extract data from Vid CLub and load it into Azure Data Lake Storage.", + retries=1, + retry_delay_seconds=60, + task_runner=ConcurrentTaskRunner, +) +def vid_club_to_adls( # noqa: PLR0913 + *args: list[Any], + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, + from_date: str = "2022-03-22", + to_date: str | None = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, + days_interval: int = 30, + cols_to_drop: list[str] | None = None, + config_key: str | None = None, + azure_key_vault_secret: str | None = None, + adls_config_key: str | None = None, + adls_azure_key_vault_secret: str | None = None, + adls_path: str | None = None, + adls_path_overwrite: bool = False, + validate_df_dict: dict | None = None, + timeout: int = 3600, + **kwargs: dict[str, Any], +) -> None: + """Flow for downloading data from the Vid Club via API to a CSV or Parquet file. + + Then upload it to Azure Data Lake. + + Args: + endpoint (Literal["jobs", "product", "company", "survey"], optional): The + endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the oldest + date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. Defaults to 100. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region + filter for the query. Defaults to None (parameter is not used in url). + [December 2023 status: value 'all' does not work for company and jobs] + days_interval (int, optional): Days specified in date range per API call + (test showed that 30-40 is optimal for performance). Defaults to 30. + cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + azure_key_vault_secret (Optional[str], optional): The name of the Azure Key + Vault secret where credentials are stored. Defaults to None. + adls_config_key (Optional[str], optional): The key in the viadot config holding + relevant credentials. Defaults to None. + adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key + Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal + credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + adls_path (Optional[str], optional): Azure Data Lake destination file path. + Defaults to None. + adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS. + Defaults to True. + validate_df_dict (dict, optional): A dictionary with optional list of tests + to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. + Defaults to None. + timeout (int, optional): The time (in seconds) to wait while running this task + before a timeout occurs. Defaults to 3600. + """ + data_frame = vid_club_to_df( + args=args, + endpoint=endpoint, + from_date=from_date, + to_date=to_date, + items_per_page=items_per_page, + region=region, + days_interval=days_interval, + cols_to_drop=cols_to_drop, + config_key=config_key, + azure_key_vault_secret=azure_key_vault_secret, + validate_df_dict=validate_df_dict, + timeout=timeout, + kawrgs=kwargs, + ) + + return df_to_adls( + df=data_frame, + path=adls_path, + credentials_secret=adls_azure_key_vault_secret, + config_key=adls_config_key, + overwrite=adls_path_overwrite, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index 8cdc4b52a..db43224a7 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -27,6 +27,7 @@ from .sharepoint import sharepoint_download_file, sharepoint_to_df from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df from .supermetrics import supermetrics_to_df +from .vid_club import vid_club_to_df __all__ = [ @@ -62,5 +63,6 @@ "sharepoint_to_df", "sql_server_query", "sql_server_to_df", + "vid_club_to_df", "supermetrics_to_df", ] diff --git a/src/viadot/orchestration/prefect/tasks/vid_club.py b/src/viadot/orchestration/prefect/tasks/vid_club.py new file mode 100644 index 000000000..b74380c7f --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/vid_club.py @@ -0,0 +1,79 @@ +"""Task for downloading data from Vid Club Cloud API.""" + +from typing import Any, Literal + +import pandas as pd +from prefect import task + +from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from viadot.orchestration.prefect.utils import get_credentials +from viadot.sources import VidClub + + +@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=2 * 60 * 60) +def vid_club_to_df( # noqa: PLR0913 + *args: list[Any], + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, + from_date: str = "2022-03-22", + to_date: str | None = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, + days_interval: int = 30, + cols_to_drop: list[str] | None = None, + azure_key_vault_secret: str | None = None, + adls_config_key: str | None = None, + validate_df_dict: dict | None = None, + timeout: int = 3600, + **kwargs: dict[str, Any], +) -> pd.DataFrame: + """Task to downloading data from Vid Club APIs to Pandas DataFrame. + + Args: + endpoint (Literal["jobs", "product", "company", "survey"], optional): + The endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the oldest + date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. Defaults to 100. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region + filter for the query. Defaults to None (parameter is not used in url). + [December 2023 status: value 'all' does not work for company and jobs] + days_interval (int, optional): Days specified in date range per API call + (test showed that 30-40 is optimal for performance). Defaults to 30. + cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + azure_key_vault_secret (Optional[str], optional): The name of the Azure Key + Vault secret where credentials are stored. Defaults to None. + validate_df_dict (dict, optional): A dictionary with optional list of tests + to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. + Defaults to None. + timeout (int, optional): The time (in seconds) to wait while running this task + before a timeout occurs. Defaults to 3600. + + Returns: Pandas DataFrame + """ + if not (azure_key_vault_secret or adls_config_key): + raise MissingSourceCredentialsError + + if not adls_config_key: + credentials = get_credentials(azure_key_vault_secret) + + vc_obj = VidClub( + args=args, + endpoint=endpoint, + from_date=from_date, + to_date=to_date, + items_per_page=items_per_page, + region=region, + days_interval=days_interval, + cols_to_drop=cols_to_drop, + vid_club_credentials=credentials, + validate_df_dict=validate_df_dict, + timeout=timeout, + kwargs=kwargs, + ) + + return vc_obj.to_df() diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 789b8ca6a..08615a9f3 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -20,6 +20,7 @@ from .sql_server import SQLServer from .supermetrics import Supermetrics, SupermetricsCredentials from .uk_carbon_intensity import UKCarbonIntensity +from .vid_club import VidClub __all__ = [ @@ -41,6 +42,7 @@ "SupermetricsCredentials", # pragma: allowlist-secret "Trino", "UKCarbonIntensity", + "VidClub", ] if find_spec("adlfs"): from viadot.sources.azure_data_lake import AzureDataLake # noqa: F401 diff --git a/src/viadot/sources/vid_club.py b/src/viadot/sources/vid_club.py new file mode 100644 index 000000000..db0adca08 --- /dev/null +++ b/src/viadot/sources/vid_club.py @@ -0,0 +1,390 @@ +"""Vid Club Cloud API connector.""" + +from datetime import datetime, timedelta +from typing import Any, Literal + +import pandas as pd + +from viadot.exceptions import ValidationError +from viadot.sources.base import Source +from viadot.utils import handle_api_response + + +class VidClub(Source): + """A class implementing the Vid Club API. + + Documentation for this API is located at: https://evps01.envoo.net/vipapi/ + There are 4 endpoints where to get the data. + """ + + def __init__( + self, + *args, + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, + from_date: str = "2022-03-22", + to_date: str | None = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, + days_interval: int = 30, + cols_to_drop: list[str] | None = None, + vid_club_credentials: dict[str, Any] | None = None, + validate_df_dict: dict | None = None, + timeout: int = 3600, + **kwargs, + ): + """Create an instance of VidClub. + + Args: + endpoint (Literal["jobs", "product", "company", "survey"], optional): The + endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the + oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. Defaults to 100. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): + Region filter for the query. Defaults to None + (parameter is not used in url). [December 2023 status: value 'all' + does not work for company and jobs] + days_interval (int, optional): Days specified in date range per API call + (test showed that 30-40 is optimal for performance). Defaults to 30. + cols_to_drop (List[str], optional): List of columns to drop. + Defaults to None. + vid_club_credentials (Dict[str, Any], optional): Stores the credentials + information. Defaults to None. + validate_df_dict (dict, optional): A dictionary with optional list of tests + to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. + Defaults to None. + timeout (int, optional): The time (in seconds) to wait while running this + task before a timeout occurs. Defaults to 3600. + """ + self.endpoint = endpoint + self.from_date = from_date + self.to_date = to_date + self.items_per_page = items_per_page + self.region = region + self.days_interval = days_interval + self.cols_to_drop = cols_to_drop + self.vid_club_credentials = vid_club_credentials + self.validate_df_dict = validate_df_dict + self.timeout = timeout + + self.headers = { + "Authorization": "Bearer " + vid_club_credentials["token"], + "Content-Type": "application/json", + } + + super().__init__(credentials=vid_club_credentials, *args, **kwargs) # noqa: B026 + + def build_query( + self, + from_date: str, + to_date: str, + api_url: str, + items_per_page: int, + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, + ) -> str: + """Builds the query from the inputs. + + Args: + from_date (str): Start date for the query. + to_date (str): End date for the query, if empty, will be executed as + datetime.today().strftime("%Y-%m-%d"). + api_url (str): Generic part of the URL to Vid Club API. + items_per_page (int): number of entries per page. + endpoint (Literal["jobs", "product", "company", "survey"], optional): + The endpoint source to be accessed. Defaults to None. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): + Region filter for the query. Defaults to None + (parameter is not used in url). [December 2023 status: value 'all' + does not work for company and jobs] + + Returns: + str: Final query with all filters added. + + Raises: + ValidationError: If any source different than the ones in the list are used. + """ + if endpoint in ["jobs", "product", "company"]: + region_url_string = f"®ion={region}" if region else "" + url = ( + f"""{api_url}{endpoint}?from={from_date}&to={to_date}""" + f"""{region_url_string}&limit={items_per_page}""" + ) + elif endpoint == "survey": + url = f"{api_url}{endpoint}?language=en&type=question" + else: + msg = "Pick one these sources: jobs, product, company, survey" + raise ValidationError(msg) + return url + + def intervals( + self, from_date: str, to_date: str, days_interval: int + ) -> tuple[list[str], list[str]]: + """Breaks dates range into smaller by provided days interval. + + Args: + from_date (str): Start date for the query in "%Y-%m-%d" format. + to_date (str): End date for the query, if empty, will be executed as + datetime.today().strftime("%Y-%m-%d"). + days_interval (int): Days specified in date range per api call + (test showed that 30-40 is optimal for performance). + + Returns: + List[str], List[str]: Starts and Ends lists that contains information + about date ranges for specific period and time interval. + + Raises: + ValidationError: If the final date of the query is before the start date. + """ + if to_date is None: + to_date = datetime.today().strftime("%Y-%m-%d") + + end_date = datetime.strptime(to_date, "%Y-%m-%d").date() + start_date = datetime.strptime(from_date, "%Y-%m-%d").date() + + from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") + + to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") + delta = to_date_obj - from_date_obj + + if delta.days < 0: + msg = "to_date cannot be earlier than from_date." + raise ValidationError(msg) + + interval = timedelta(days=days_interval) + starts = [] + ends = [] + + period_start = start_date + while period_start < end_date: + period_end = min(period_start + interval, end_date) + starts.append(period_start.strftime("%Y-%m-%d")) + ends.append(period_end.strftime("%Y-%m-%d")) + period_start = period_end + if len(starts) == 0 and len(ends) == 0: + starts.append(from_date) + ends.append(to_date) + return starts, ends + + def check_connection( + self, + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, + from_date: str = "2022-03-22", + to_date: str | None = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, + url: str | None = None, + ) -> tuple[dict[str, Any], str]: + """Initiate first connection to API to retrieve piece of data. + + With information about type of pagination in API URL. + This option is added because type of pagination for endpoints is being changed + in the future from page number to 'next' id. + + Args: + endpoint (Literal["jobs", "product", "company", "survey"], optional): + The endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the + oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. + 100 entries by default. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): + Region filter for the query. Defaults to None + (parameter is not used in url). [December 2023 status: value 'all' + does not work for company and jobs] + url (str, optional): Generic part of the URL to Vid Club API. + Defaults to None. + + Returns: + Tuple[Dict[str, Any], str]: Dictionary with first response from API with + JSON containing data and used URL string. + + Raises: + ValidationError: If from_date is earlier than 2022-03-22. + ValidationError: If to_date is earlier than from_date. + """ + if from_date < "2022-03-22": + msg = "from_date cannot be earlier than 2022-03-22." + raise ValidationError(msg) + + if to_date < from_date: + msg = "to_date cannot be earlier than from_date." + raise ValidationError(msg) + + if url is None: + url = self.credentials["url"] + + first_url = self.build_query( + endpoint=endpoint, + from_date=from_date, + to_date=to_date, + api_url=url, + items_per_page=items_per_page, + region=region, + ) + headers = self.headers + response = handle_api_response(url=first_url, headers=headers, method="GET") + response = response.json() + return (response, first_url) + + def get_response( + self, + endpoint: Literal["jobs", "product", "company", "survey"] | None = None, + from_date: str = "2022-03-22", + to_date: str | None = None, + items_per_page: int = 100, + region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] | None = None, + ) -> pd.DataFrame: + """Basing on the pagination type retrieved using check_connection function. + + It gets the response from the API queried and transforms it into DataFrame. + + Args: + endpoint (Literal["jobs", "product", "company", "survey"], optional): + The endpoint source to be accessed. Defaults to None. + from_date (str, optional): Start date for the query, by default is the + oldest date in the data 2022-03-22. + to_date (str, optional): End date for the query. By default None, + which will be executed as datetime.today().strftime("%Y-%m-%d") in code. + items_per_page (int, optional): Number of entries per page. + 100 entries by default. + region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): + Region filter for the query. Defaults to None + (parameter is not used in url). [December 2023 status: value 'all' + does not work for company and jobs] + + Returns: + pd.DataFrame: Table of the data carried in the response. + + Raises: + ValidationError: If any source different than the ones in the list are used. + """ + headers = self.headers + if endpoint not in ["jobs", "product", "company", "survey"]: + msg = "The source has to be: jobs, product, company or survey" + raise ValidationError(msg) + if to_date is None: + to_date = datetime.today().strftime("%Y-%m-%d") + + response, first_url = self.check_connection( + endpoint=endpoint, + from_date=from_date, + to_date=to_date, + items_per_page=items_per_page, + region=region, + ) + + if isinstance(response, dict): + keys_list = list(response.keys()) + elif isinstance(response, list): + keys_list = list(response[0].keys()) + else: + keys_list = [] + + ind = "next" in keys_list + + if "data" in keys_list: + df = pd.json_normalize(response["data"]) + df = pd.DataFrame(df) + length = df.shape[0] + page = 1 + + while length == items_per_page: + if ind is True: + next_page = response["next"] + url = f"{first_url}&next={next_page}" + else: + page += 1 + url = f"{first_url}&page={page}" + response_api = handle_api_response( + url=url, headers=headers, method="GET" + ) + response = response_api.json() + df_page = pd.json_normalize(response["data"]) + df_page = pd.DataFrame(df_page) + if endpoint == "product": + df_page = df_page.transpose() + length = df_page.shape[0] + df = pd.concat((df, df_page), axis=0) + else: + df = pd.DataFrame(response) + + return df + + def to_df( + self, + if_empty: Literal["warn", "skip", "fail"] = "warn", + ) -> pd.DataFrame: + """Looping get_response and iterating by date ranges defined in intervals. + + Stores outputs as DataFrames in a list. At the end, daframes are concatenated + in one and dropped duplicates that would appear when quering. + + Args: + if_empty (Literal["warn", "skip", "fail"], optional): What to do if a fetch + produce no data. Defaults to "warn + + Returns: + pd.DataFrame: Dataframe of the concatanated data carried in the responses. + """ + starts, ends = self.intervals( + from_date=self.from_date, + to_date=self.to_date, + days_interval=self.days_interval, + ) + + dfs_list = [] + if len(starts) > 0 and len(ends) > 0: + for start, end in zip(starts, ends, strict=False): + self.logger.info(f"ingesting data for dates [{start}]-[{end}]...") + df = self.get_response( + endpoint=self.endpoint, + from_date=start, + to_date=end, + items_per_page=self.items_per_page, + region=self.region, + ) + dfs_list.append(df) + if len(dfs_list) > 1: + df = pd.concat(dfs_list, axis=0, ignore_index=True) + else: + df = pd.DataFrame(dfs_list[0]) + else: + df = self.get_response( + endpoint=self.endpoint, + from_date=self.from_date, + to_date=self.to_date, + items_per_page=self.items_per_page, + region=self.region, + ) + list_columns = df.columns[df.map(lambda x: isinstance(x, list)).any()].tolist() + for i in list_columns: + df[i] = df[i].apply(lambda x: tuple(x) if isinstance(x, list) else x) + df.drop_duplicates(inplace=True) + + if self.cols_to_drop is not None: + if isinstance(self.cols_to_drop, list): + try: + self.logger.info( + f"Dropping following columns: {self.cols_to_drop}..." + ) + df.drop(columns=self.cols_to_drop, inplace=True, errors="raise") + except KeyError: + self.logger.exception( + f"""Column(s): {self.cols_to_drop} don't exist in the DataFrame. + No columns were dropped. Returning full DataFrame...""" + ) + self.logger.info(f"Existing columns: {df.columns}") + else: + msg = "Provide columns to drop in a List." + raise TypeError(msg) + + if df.empty: + self._handle_if_empty(if_empty=if_empty) + + return df diff --git a/tests/integration/orchestration/prefect/flows/test_vid_club.py b/tests/integration/orchestration/prefect/flows/test_vid_club.py new file mode 100644 index 000000000..7053e04f1 --- /dev/null +++ b/tests/integration/orchestration/prefect/flows/test_vid_club.py @@ -0,0 +1,31 @@ +from src.viadot.orchestration.prefect.flows import vid_club_to_adls +from src.viadot.sources import AzureDataLake + + +TEST_FILE_PATH = "test/path/to/adls.parquet" +TEST_SOURCE = "jobs" +TEST_FROM_DATE = "2023-01-01" +TEST_TO_DATE = "2023-12-31" +ADLS_CREDENTIALS_SECRET = "test_adls_secret" # pragma: allowlist secret # noqa: S105 +VIDCLUB_CREDENTIALS_SECRET = ( + "test_vidclub_secret" # pragma: allowlist secret # noqa: S105 +) + + +def test_vid_club_to_adls(): + lake = AzureDataLake(config_key="adls_test") + + assert not lake.exists(TEST_FILE_PATH) + + vid_club_to_adls( + endpoint=TEST_SOURCE, + from_date=TEST_FROM_DATE, + to_date=TEST_TO_DATE, + adls_path=TEST_FILE_PATH, + adls_azure_key_vault_secret=ADLS_CREDENTIALS_SECRET, + vidclub_credentials_secret=VIDCLUB_CREDENTIALS_SECRET, + ) + + assert lake.exists(TEST_FILE_PATH) + + lake.rm(TEST_FILE_PATH) diff --git a/tests/integration/orchestration/prefect/tasks/test_vid_club.py b/tests/integration/orchestration/prefect/tasks/test_vid_club.py new file mode 100644 index 000000000..2d39001bc --- /dev/null +++ b/tests/integration/orchestration/prefect/tasks/test_vid_club.py @@ -0,0 +1,52 @@ +import pandas as pd +import pytest + +from src.viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from src.viadot.orchestration.prefect.tasks import vid_club_to_df + + +EXPECTED_DF = pd.DataFrame( + {"id": [1, 2], "name": ["Company A", "Company B"], "region": ["pl", "ro"]} +) + + +class MockVidClub: + def __init__(self, *args, **kwargs): + """Init method.""" + pass + + def to_df(self): + return EXPECTED_DF + + +def test_vid_club_to_df(mocker): + mocker.patch("viadot.orchestration.prefect.tasks.VidClub", new=MockVidClub) + + df = vid_club_to_df( + endpoint="company", + from_date="2023-01-01", + to_date="2023-12-31", + items_per_page=100, + region="pl", + vidclub_credentials_secret="VIDCLUB", # pragma: allowlist secret # noqa: S106 + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + assert df.equals(EXPECTED_DF) + + +def test_vid_club_to_df_missing_credentials(mocker): + mocker.patch( + "viadot.orchestration.prefect.tasks.get_credentials", return_value=None + ) + + with pytest.raises(MissingSourceCredentialsError): + vid_club_to_df( + endpoint="company", + from_date="2023-01-01", + to_date="2023-12-31", + items_per_page=100, + region="pl", + vidclub_credentials_secret="VIDCLUB", # pragma: allowlist secret # noqa: S106 + ) diff --git a/tests/unit/test_vid_club.py b/tests/unit/test_vid_club.py new file mode 100644 index 000000000..db8b27b29 --- /dev/null +++ b/tests/unit/test_vid_club.py @@ -0,0 +1,60 @@ +import unittest + +import pytest + +from viadot.sources.vid_club import ValidationError, VidClub + + +class TestVidClub(unittest.TestCase): + def setUp(self): + """Setup VidClub instance before each test.""" + # Sample input data for the constructor + self.vid_club = VidClub( + endpoint="jobs", vid_club_credentials={"token": "test-token"} + ) + + def test_build_query(self): + """Test correct URL generation for the 'jobs' endpoint.""" + # Sample input data for the build_query method + from_date = "2023-01-01" + to_date = "2023-01-31" + api_url = "https://example.com/api/" + items_per_page = 50 + endpoint = "jobs" + region = "pl" + + # Expected result URL + expected_url = "https://example.com/api/jobs?from=2023-01-01&to=2023-01-31®ion=pl&limit=50" + + # Check if the method returns the correct URL + result_url = self.vid_club.build_query( + from_date, to_date, api_url, items_per_page, endpoint, region + ) + assert result_url == expected_url + + def test_intervals(self): + """Test breaking date range into intervals based on the days_interval.""" + # Sample input data for the intervals method + from_date = "2023-01-01" + to_date = "2023-01-15" + days_interval = 5 + + # Expected starts and ends lists + expected_starts = ["2023-01-01", "2023-01-06", "2023-01-11"] + expected_ends = ["2023-01-06", "2023-01-11", "2023-01-15"] + + # Check if the method returns correct intervals + starts, ends = self.vid_club.intervals(from_date, to_date, days_interval) + assert starts == expected_starts + assert ends == expected_ends + + def test_intervals_invalid_date_range(self): + """Test that ValidationError is raised when to_date is before from_date.""" + # Sample input data where to_date is before from_date + from_date = "2023-01-15" + to_date = "2023-01-01" + days_interval = 5 + + # Check if ValidationError is raised + with pytest.raises(ValidationError): + self.vid_club.intervals(from_date, to_date, days_interval) From 4fe8913b8685c1fdbeb78f706b16020c68095dac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=B3jcik?= <107313911+adrian-wojcik@users.noreply.github.com> Date: Fri, 4 Oct 2024 11:40:07 +0200 Subject: [PATCH 2/3] =?UTF-8?q?=E2=9C=A8=20Add=20`AzureSQL`=20source=20and?= =?UTF-8?q?=20Prefect=20tasks=20(#1043)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🚀 Adding `Aselite` connector with integration tests * 🚀 Add unit tests for Aselite (Azure SQL) * 🐛 Fix task utils bug * 🎨 Refactor task utils file to pass code_checker * removed prefect dependency from azure_sql source * Bug located into the return task * ♻️ Change flow and task name to `azure_sql` * Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/tasks/azure_sql.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/tasks/azure_sql.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/tasks/azure_sql.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/tasks/azure_sql.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/tasks/azure_sql.py Co-authored-by: Michał Zawadzki * Update src/viadot/orchestration/prefect/tasks/azure_sql.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/tasks/test_azure_sql.py Co-authored-by: Michał Zawadzki * Update tests/integration/orchestration/prefect/tasks/test_azure_sql.py Co-authored-by: Michał Zawadzki * 🐛 Fix task tests bugs * 🐛 Fix bugs in azure sql unit tests * 🎨 Changed docstring description for parameter `convert_bytes` * 🔥 Remove task tests as all of it is covered in unit tests * 🎨 Improved structure of the `AzureSQL` source class and added docstring * ✅ Modified tests structure * 🎨 Removed unused parameters and improved structure of the code * 🎨 Removed unused parameters and improved structure of the flow code * 🎨 Improved structure of the tests code * 🎨 Improved structure of the `__init__` files * 🎨 Added extra spaces in `chunk_df` * 🚧 Added `pragma: allowlist secret` * 🚧 Added `# noqa: S105` * 🚧 Added `pragma: allowlist secret` * 🚨 Fix linter and pre-commit errors * 🐛 Removed `src` * ✅ Updated tests * 🐛 Fixed names * 🐛 Added fixtures * Update tests/unit/test_azure_sql.py Co-authored-by: Michał Zawadzki * 🎨 Moved operations from task to source and created new `to_df()` method * 🎨 Changed to `map` and added `super()` in to_df method * 📝 Add connector documentation --------- Co-authored-by: adrian-wojcik Co-authored-by: fdelgadodyvenia Co-authored-by: Michał Zawadzki Co-authored-by: rziemianek Co-authored-by: Rafał Ziemianek <49795849+Rafalz13@users.noreply.github.com> --- .../references/orchestration/prefect/flows.md | 2 + .../references/orchestration/prefect/tasks.md | 2 + docs/references/sources/sql.md | 2 + .../orchestration/prefect/flows/__init__.py | 2 + .../prefect/flows/azure_sql_to_adls.py | 77 ++++++++ .../orchestration/prefect/tasks/__init__.py | 2 + .../orchestration/prefect/tasks/azure_sql.py | 70 ++++++++ .../orchestration/prefect/tasks/task_utils.py | 41 +---- src/viadot/sources/__init__.py | 2 + src/viadot/sources/azure_sql.py | 165 ++++++++++++++++++ src/viadot/utils.py | 44 +++++ .../prefect/flows/test_azure_sql_to_adls.py | 88 ++++++++++ tests/unit/test_azure_sql.py | 73 ++++++++ tests/unit/test_utils.py | 55 ++++++ 14 files changed, 588 insertions(+), 37 deletions(-) create mode 100644 src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py create mode 100644 src/viadot/orchestration/prefect/tasks/azure_sql.py create mode 100644 src/viadot/sources/azure_sql.py create mode 100644 tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py create mode 100644 tests/unit/test_azure_sql.py diff --git a/docs/references/orchestration/prefect/flows.md b/docs/references/orchestration/prefect/flows.md index e1c291f72..0ccc75105 100644 --- a/docs/references/orchestration/prefect/flows.md +++ b/docs/references/orchestration/prefect/flows.md @@ -43,3 +43,5 @@ ::: viadot.orchestration.prefect.flows.sql_server_to_minio ::: viadot.orchestration.prefect.flows.sql_server_to_parquet + +::: viadot.orchestration.prefect.flows.azure_sql_to_adls diff --git a/docs/references/orchestration/prefect/tasks.md b/docs/references/orchestration/prefect/tasks.md index 67e6cccdb..002c8852f 100644 --- a/docs/references/orchestration/prefect/tasks.md +++ b/docs/references/orchestration/prefect/tasks.md @@ -43,3 +43,5 @@ ::: viadot.orchestration.prefect.tasks.sql_server_query ::: viadot.orchestration.prefect.tasks.sql_server_to_df + +::: viadot.orchestration.prefect.tasks.azure_sql_to_df diff --git a/docs/references/sources/sql.md b/docs/references/sources/sql.md index 80b107ef3..114bf97a2 100644 --- a/docs/references/sources/sql.md +++ b/docs/references/sources/sql.md @@ -23,3 +23,5 @@ ::: viadot.sources.sap_rfc.SAPRFC ::: viadot.sources.sap_rfc.SAPRFCV2 + +::: viadot.sources.azure_sql.AzureSQL diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index a2d489ab9..fce71f78f 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -1,5 +1,6 @@ """Import flows.""" +from .azure_sql_to_adls import azure_sql_to_adls from .bigquery_to_adls import bigquery_to_adls from .cloud_for_customers_to_adls import cloud_for_customers_to_adls from .cloud_for_customers_to_databricks import cloud_for_customers_to_databricks @@ -34,6 +35,7 @@ __all__ = [ + "azure_sql_to_adls", "bigquery_to_adls", "cloud_for_customers_to_adls", "cloud_for_customers_to_databricks", diff --git a/src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py b/src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py new file mode 100644 index 000000000..591ffe3e8 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py @@ -0,0 +1,77 @@ +"""Flows for downloading data from Azure SQL and uploading it to Azure ADLS.""" + +from typing import Any + +from prefect import flow +from prefect.task_runners import ConcurrentTaskRunner + +from viadot.orchestration.prefect.tasks import azure_sql_to_df, df_to_adls + + +@flow( + name="Azure SQL extraction to ADLS", + description="Extract data from Azure SQL" + + " and load it into Azure Data Lake Storage.", + retries=1, + retry_delay_seconds=60, + task_runner=ConcurrentTaskRunner, + log_prints=True, +) +def azure_sql_to_adls( + query: str | None = None, + credentials_secret: str | None = None, + validate_df_dict: dict[str, Any] | None = None, + convert_bytes: bool = False, + remove_special_characters: bool | None = None, + columns_to_clean: list[str] | None = None, + adls_config_key: str | None = None, + adls_azure_key_vault_secret: str | None = None, + adls_path: str | None = None, + adls_path_overwrite: bool = False, +) -> None: + r"""Download data from Azure SQL to a CSV file and uploading it to ADLS. + + Args: + query (str): Query to perform on a database. Defaults to None. + credentials_secret (str, optional): The name of the Azure Key Vault + secret containing a dictionary with database credentials. + Defaults to None. + validate_df_dict (Dict[str], optional): A dictionary with optional list of + tests to verify the output dataframe. If defined, triggers the `validate_df` + task from task_utils. Defaults to None. + convert_bytes (bool). A boolean value to trigger method df_converts_bytes_to_int + It is used to convert bytes data type into int, as pulling data with bytes + can lead to malformed data in data frame. + Defaults to False. + remove_special_characters (str, optional): Call a function that remove + special characters like escape symbols. Defaults to None. + columns_to_clean (List(str), optional): Select columns to clean, used with + remove_special_characters. If None whole data frame will be processed. + Defaults to None. + adls_config_key (Optional[str], optional): The key in the viadot config holding + relevant credentials. Defaults to None. + adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key + Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal + credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + adls_path (Optional[str], optional): Azure Data Lake destination file path (with + file name). Defaults to None. + adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS. + Defaults to True. + """ + data_frame = azure_sql_to_df( + query=query, + credentials_secret=credentials_secret, + validate_df_dict=validate_df_dict, + convert_bytes=convert_bytes, + remove_special_characters=remove_special_characters, + columns_to_clean=columns_to_clean, + ) + + return df_to_adls( + df=data_frame, + path=adls_path, + credentials_secret=adls_azure_key_vault_secret, + config_key=adls_config_key, + overwrite=adls_path_overwrite, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index db43224a7..7250c17f4 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -1,6 +1,7 @@ """Imports.""" from .adls import adls_upload, df_to_adls +from .azure_sql import azure_sql_to_df from .bcp import bcp from .bigquery import bigquery_to_df from .cloud_for_customers import cloud_for_customers_to_df @@ -31,6 +32,7 @@ __all__ = [ + "azure_sql_to_df", "adls_upload", "bcp", "clone_repo", diff --git a/src/viadot/orchestration/prefect/tasks/azure_sql.py b/src/viadot/orchestration/prefect/tasks/azure_sql.py new file mode 100644 index 000000000..f065b680d --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/azure_sql.py @@ -0,0 +1,70 @@ +"""Task for downloading data from Azure SQL.""" + +from typing import Any, Literal + +import pandas as pd +from prefect import task + +from viadot.orchestration.prefect.utils import get_credentials +from viadot.sources import AzureSQL +from viadot.utils import validate + + +@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60) +def azure_sql_to_df( + query: str | None = None, + credentials_secret: str | None = None, + validate_df_dict: dict[str, Any] | None = None, + convert_bytes: bool = False, + remove_special_characters: bool | None = None, + columns_to_clean: list[str] | None = None, + if_empty: Literal["warn", "skip", "fail"] = "warn", +) -> pd.DataFrame: + r"""Task to download data from Azure SQL. + + Args: + query (str): Query to perform on a database. Defaults to None. + credentials_secret (str, optional): The name of the Azure Key Vault + secret containing a dictionary with database credentials. + Defaults to None. + validate_df_dict (Dict[str], optional): A dictionary with optional list of + tests to verify the output dataframe. If defined, triggers the `validate_df` + task from task_utils. Defaults to None. + convert_bytes (bool). A boolean value to trigger method df_converts_bytes_to_int + It is used to convert bytes data type into int, as pulling data with bytes + can lead to malformed data in data frame. + Defaults to False. + remove_special_characters (str, optional): Call a function that remove + special characters like escape symbols. Defaults to None. + columns_to_clean (List(str), optional): Select columns to clean, used with + remove_special_characters. If None whole data frame will be processed. + Defaults to None. + if_empty (Literal["warn", "skip", "fail"], optional): What to do if the + query returns no data. Defaults to None. + + Raises: + ValueError: Raising ValueError if credentials_secret is not provided + + Returns: + pd.DataFrame: The response data as a pandas DataFrame. + """ + if not credentials_secret: + msg = "`credentials_secret` has to be specified and not empty." + raise ValueError(msg) + + credentials = get_credentials(credentials_secret) + + azure_sql = AzureSQL(credentials=credentials) + + df = azure_sql.to_df( + query=query, + if_empty=if_empty, + convert_bytes=convert_bytes, + remove_special_characters=remove_special_characters, + columns_to_clean=columns_to_clean, + ) + + if validate_df_dict is not None: + validate(df=df, tests=validate_df_dict) + + return df diff --git a/src/viadot/orchestration/prefect/tasks/task_utils.py b/src/viadot/orchestration/prefect/tasks/task_utils.py index e6f958311..4f4b2a6d6 100644 --- a/src/viadot/orchestration/prefect/tasks/task_utils.py +++ b/src/viadot/orchestration/prefect/tasks/task_utils.py @@ -19,8 +19,8 @@ def dtypes_to_json_task(dtypes_dict: dict[str, Any], local_json_path: str) -> No dtypes_dict (dict): Dictionary containing data types. local_json_path (str): Path to local json file. """ - with Path(local_json_path).open("w") as fp: - json.dump(dtypes_dict, fp) + with Path(local_json_path).open("w") as file_path: + json.dump(dtypes_dict, file_path) @task @@ -59,7 +59,7 @@ def get_sql_dtypes_from_df(df: pd.DataFrame) -> dict: "Categorical": "VARCHAR(500)", "Time": "TIME", "Boolean": "VARCHAR(5)", # Bool is True/False, Microsoft expects 0/1 - "DateTime": "DATETIMEOFFSET", # DATETIMEOFFSET is the only timezone-aware dtype in TSQL + "DateTime": "DATETIMEOFFSET", # DATETIMEOFFSET is timezone-aware dtype in TSQL "Object": "VARCHAR(500)", "EmailAddress": "VARCHAR(50)", "File": None, @@ -73,7 +73,7 @@ def get_sql_dtypes_from_df(df: pd.DataFrame) -> dict: "String": "VARCHAR(500)", "IPAddress": "VARCHAR(39)", "Path": "VARCHAR(255)", - "TimeDelta": "VARCHAR(20)", # datetime.datetime.timedelta; eg. '1 days 11:00:00' + "TimeDelta": "VARCHAR(20)", # datetime.datetime.timedelta; eg.'1 days 11:00:00' "URL": "VARCHAR(255)", "Count": "INT", } @@ -209,36 +209,3 @@ def union_dfs_task(dfs: list[pd.DataFrame]) -> pd.DataFrame: different size of DataFrames NaN values can appear. """ return pd.concat(dfs, ignore_index=True) - - -@task -def df_clean_column( - df: pd.DataFrame, columns_to_clean: list[str] | None = None -) -> pd.DataFrame: - """Remove special characters from a pandas DataFrame. - - Args: - df (pd.DataFrame): The DataFrame to clean. - columns_to_clean (List[str]): A list of columns to clean. Defaults is None. - - Returns: - pd.DataFrame: The cleaned DataFrame - """ - df = df.copy() - - if columns_to_clean is None: - df.replace( - to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], - value=["", ""], - regex=True, - inplace=True, - ) - else: - for col in columns_to_clean: - df[col].replace( - to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], - value=["", ""], - regex=True, - inplace=True, - ) - return df diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 08615a9f3..c8501d90d 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -4,6 +4,7 @@ from ._duckdb import DuckDB from ._trino import Trino +from .azure_sql import AzureSQL from .bigquery import BigQuery from .cloud_for_customers import CloudForCustomers from .customer_gauge import CustomerGauge @@ -24,6 +25,7 @@ __all__ = [ + "AzureSQL", "BigQuery", "CloudForCustomers", "CustomerGauge", diff --git a/src/viadot/sources/azure_sql.py b/src/viadot/sources/azure_sql.py new file mode 100644 index 000000000..e6570b83a --- /dev/null +++ b/src/viadot/sources/azure_sql.py @@ -0,0 +1,165 @@ +"""A connector for Azure SQL Database.""" + +import logging +from typing import Literal + +import pandas as pd + +from viadot.utils import df_clean_column, df_converts_bytes_to_int + +from .sql_server import SQLServer + + +logger = logging.getLogger(__name__) + + +class AzureSQL(SQLServer): + """Azure SQL connector class.""" + + def __init__(self, *args, config_key: str = "AZURE_SQL", **kwargs): + """Initialize the AzureSQL connector. + + This constructor sets up the Azure SQL connector with the specified + configuration key. It allows for additional positional and keyword arguments + to be passed to the parent SQLServer class. + + Args: + *args: Variable length argument list passed to the parent class. + config_key (str, optional): The configuration key used to retrieve + connection settings. Defaults to "AZURE_SQL". + **kwargs: Additional keyword arguments passed to the parent class. + """ + super().__init__(*args, config_key=config_key, **kwargs) + + def bulk_insert( + self, + table: str, + schema: str | None = None, + source_path: str | None = None, + sep: str | None = "\t", + if_exists: Literal["append", "replace"] = "append", + ) -> bool: + r"""Function to bulk insert. + + Args: + table (str): Table name. + schema (str, optional): Schema name. Defaults to None. + source_path (str, optional): Full path to a data file. Defaults to one. + sep (str, optional): field terminator to be used for char and + widechar data files. Defaults to "\t". + if_exists (Literal["append", "replace"] , optional): What to do if the table + already exists. Defaults to "append". + """ + if schema is None: + schema = self.DEFAULT_SCHEMA + fqn = f"{schema}.{table}" + insert_sql = f""" + BULK INSERT {fqn} FROM '{source_path}' + WITH ( + CHECK_CONSTRAINTS, + DATA_SOURCE='{self.credentials['data_source']}', + DATAFILETYPE='char', + FIELDTERMINATOR='{sep}', + ROWTERMINATOR='0x0a', + FIRSTROW=2, + KEEPIDENTITY, + TABLOCK, + CODEPAGE='65001' + ); + """ + if if_exists == "replace": + self.run(f"DELETE FROM {schema}.{table}") # noqa: S608 + self.run(insert_sql) + return True + + def create_external_database( + self, + external_database_name: str, + storage_account_name: str, + container_name: str, + sas_token: str, + master_key_password: str, + credential_name: str | None = None, + ) -> None: + """Create an external database. + + Used to eg. execute BULK INSERT or OPENROWSET queries. + + Args: + external_database_name (str): The name of the extrnal source (db) + to be created. + storage_account_name (str): The name of the Azure storage account. + container_name (str): The name of the container which should + become the "database". + sas_token (str): The SAS token to be used as the credential. + Note that the auth system in Azure is pretty broken and you might need + to paste here your storage account's account key instead. + master_key_password (str): The password for the database master key of your + Azure SQL Database. + credential_name (str): How to name the SAS credential. This is really + an Azure internal thing and can be anything. + By default '{external_database_name}_credential`. + """ + # stupid Microsoft thing + if sas_token.startswith("?"): + sas_token = sas_token[1:] + + if credential_name is None: + credential_name = f"{external_database_name}_credential" + + create_master_key_sql = ( + f"CREATE MASTER KEY ENCRYPTION BY PASSWORD = {master_key_password}" + ) + + create_external_db_credential_sql = f""" + CREATE DATABASE SCOPED CREDENTIAL {credential_name} + WITH IDENTITY = 'SHARED ACCESS SIGNATURE' + SECRET = '{sas_token}'; + """ + + create_external_db_sql = f""" + CREATE EXTERNAL DATA SOURCE {external_database_name} WITH ( + LOCATION = f'https://{storage_account_name}.blob.core.windows.net/' \ + f'{container_name}', + CREDENTIAL = {credential_name} + ); + """ + + self.run(create_master_key_sql) + self.run(create_external_db_credential_sql) + self.run(create_external_db_sql) + + def to_df( + self, + query: str, + if_empty: Literal["warn", "skip", "fail"] = "warn", + convert_bytes: bool = False, + remove_special_characters: bool | None = None, + columns_to_clean: list[str] | None = None, + ) -> pd.DataFrame: + """Execute a query and return the result as a pandas DataFrame. + + Args: + query (str): The query to execute. + con (pyodbc.Connection, optional): The connection to use to pull the data. + if_empty (Literal["warn", "skip", "fail"], optional): What to do if the + query returns no data. Defaults to None. + convert_bytes (bool). A boolean value to trigger method + df_converts_bytes_to_int. It is used to convert bytes data type into + int, as pulling data with bytes can lead to malformed data in dataframe. + Defaults to False. + remove_special_characters (str, optional): Call a function that remove + special characters like escape symbols. Defaults to None. + columns_to_clean (List(str), optional): Select columns to clean, used with + remove_special_characters. If None whole data frame will be processed. + Defaults to None. + """ + df = super().to_df(query=query, if_empty=if_empty) + + if convert_bytes: + df = df_converts_bytes_to_int(df=df) + + if remove_special_characters: + df = df_clean_column(df=df, columns_to_clean=columns_to_clean) + + return df diff --git a/src/viadot/utils.py b/src/viadot/utils.py index a7916ae92..ec581d9cd 100644 --- a/src/viadot/utils.py +++ b/src/viadot/utils.py @@ -962,3 +962,47 @@ def anonymize_df( df.drop(columns=["temp_date_col"], inplace=True, errors="ignore") return df + + +def df_converts_bytes_to_int(df: pd.DataFrame) -> pd.DataFrame: + """Task to convert bytes values to int. + + Args: + df (pd.DataFrame): Data Frame to convert + + Returns: + pd.DataFrame: Data Frame after convert + """ + return df.map(lambda x: int(x) if isinstance(x, bytes) else x) + + +def df_clean_column( + df: pd.DataFrame, columns_to_clean: list[str] | None = None +) -> pd.DataFrame: + """Remove special characters from a pandas DataFrame. + + Args: + df (pd.DataFrame): The DataFrame to clean. + columns_to_clean (List[str]): A list of columns to clean. Defaults is None. + + Returns: + pd.DataFrame: The cleaned DataFrame + """ + df = df.copy() + + if columns_to_clean is None: + df.replace( + to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], + value=["", ""], + regex=True, + inplace=True, + ) + else: + for col in columns_to_clean: + df[col].replace( + to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], + value=["", ""], + regex=True, + inplace=True, + ) + return df diff --git a/tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py b/tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py new file mode 100644 index 000000000..44f5c78bb --- /dev/null +++ b/tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py @@ -0,0 +1,88 @@ +from unittest.mock import patch + +import pandas as pd +import pytest + +from viadot.orchestration.prefect.flows import azure_sql_to_adls +from viadot.sources.azure_data_lake import AzureDataLake + + +@pytest.fixture +def query(): + return "SELECT * FROM your_table_name" + + +@pytest.fixture +def TEST_FILE_PATH(): + return "test_file_path" + + +@pytest.fixture +def adls_credentials_secret(): + return "mock_adls_credentials_secret" + + +@pytest.fixture +def azure_sql_credentials_secret(): + return "mock_azure_sql_credentials_secret" + + +def test_azure_sql_to_adls( + query, + TEST_FILE_PATH, + adls_credentials_secret, + azure_sql_credentials_secret, +): + lake = AzureDataLake(config_key="adls_test") + + # Ensure the file does not exist before the test + assert not lake.exists(TEST_FILE_PATH) + + with ( + patch( + "viadot.orchestration.prefect.tasks.azure_sql_to_df" + ) as mock_azure_sql_to_df, + patch("viadot.orchestration.prefect.tasks.df_to_adls") as mock_df_to_adls, + ): + mock_df = pd.DataFrame({"column1": [1, 2], "column2": [3, 4]}) + mock_azure_sql_to_df.return_value = mock_df + + # Call the flow + azure_sql_to_adls( + query=query, + credentials_secret=azure_sql_credentials_secret, + validate_df_dict=None, + convert_bytes=False, + remove_special_characters=None, + columns_to_clean=None, + adls_config_key="adls_test", + adls_azure_key_vault_secret=adls_credentials_secret, + adls_path=TEST_FILE_PATH, + adls_path_overwrite=True, + ) + + # Assert that the azure_sql_to_df task was called with the correct arguments + mock_azure_sql_to_df.assert_called_once_with( + query=query, + credentials_secret=azure_sql_credentials_secret, + sep=",", + file_path=TEST_FILE_PATH, + if_exists="replace", + validate_df_dict=None, + convert_bytes=False, + remove_special_characters=None, + columns_to_clean=None, + ) + + # Assert that df_to_adls was called with the correct arguments + mock_df_to_adls.assert_called_once_with( + df=mock_df, + path=TEST_FILE_PATH, + credentials_secret=adls_credentials_secret, + config_key="adls_test", + overwrite=True, + ) + + assert lake.exists(TEST_FILE_PATH) + + lake.rm(TEST_FILE_PATH) diff --git a/tests/unit/test_azure_sql.py b/tests/unit/test_azure_sql.py new file mode 100644 index 000000000..5b419cf97 --- /dev/null +++ b/tests/unit/test_azure_sql.py @@ -0,0 +1,73 @@ +import pytest + +from viadot.sources.azure_sql import AzureSQL +from viadot.sources.sql_server import SQLServerCredentials + + +@pytest.fixture +def azure_sql_credentials(): + return SQLServerCredentials( + user="test_user", + password="test_password", # pragma: allowlist secret # noqa: S106 + server="localhost", + db_name="test_db", + driver="ODBC Driver 17 for SQL Server", + ) + + +@pytest.fixture +def azure_sql(azure_sql_credentials: SQLServerCredentials, mocker): + mocker.patch("viadot.sources.base.SQL.con", return_value=True) + + return AzureSQL( + credentials={ + "user": azure_sql_credentials.user, + "password": azure_sql_credentials.password, + "server": azure_sql_credentials.server, + "db_name": azure_sql_credentials.db_name, + "data_source": "test_data_source", + } + ) + + +def test_azure_sql_initialization(azure_sql): + """Test that the AzureSQL object is initialized with the correct credentials.""" + assert azure_sql.credentials["server"] == "localhost" + assert azure_sql.credentials["user"] == "test_user" + assert ( + azure_sql.credentials["password"] + == "test_password" # pragma: allowlist secret # noqa: S105 + ) + assert azure_sql.credentials["db_name"] == "test_db" + + +def test_create_external_database(azure_sql, mocker): + """Test the create_external_database function.""" + mock_run = mocker.patch("viadot.sources.base.SQL.run", return_value=True) + + # Test parameters + external_database_name = "test_external_db" + storage_account_name = "test_storage_account" + container_name = "test_container" + sas_token = "test_sas_token" # noqa: S105 + master_key_password = ( + "test_master_key_password" # pragma: allowlist secret # noqa: S105 + ) + credential_name = "custom_credential_name" + + azure_sql.create_external_database( + external_database_name=external_database_name, + storage_account_name=storage_account_name, + container_name=container_name, + sas_token=sas_token, + master_key_password=master_key_password, + credential_name=credential_name, + ) + + # Expected SQL commands with custom credential name + expected_master_key_sql = ( + f"CREATE MASTER KEY ENCRYPTION BY PASSWORD = {master_key_password}" + ) + + mock_run.assert_any_call(expected_master_key_sql) + assert mock_run.call_count == 3 # Ensure all 3 SQL commands were executed diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index ec2c74e8a..2b2b716e6 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -8,6 +8,8 @@ from viadot.utils import ( _cast_df_cols, add_viadot_metadata_columns, + df_clean_column, + df_converts_bytes_to_int, gen_bulk_insert_query_from_df, get_fqn, handle_api_request, @@ -282,3 +284,56 @@ def test_validate_and_reorder_different_order_columns(): assert result[0].equals(df1) assert list(result[1].columns) == list(expected_df2.columns) assert result[1].equals(expected_df2) + + +def test_df_converts_bytes_to_int(): + df_bytes = pd.DataFrame( + { + "A": [b"1", b"2", b"3"], + "B": [b"4", b"5", b"6"], + "C": ["no change", "still no change", "me neither"], + } + ) + + result = df_converts_bytes_to_int(df_bytes) + + expected = pd.DataFrame( + { + "A": [1, 2, 3], + "B": [4, 5, 6], + "C": ["no change", "still no change", "me neither"], + } + ) + + pd.testing.assert_frame_equal(result, expected) + + df_no_bytes = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) + + result_no_bytes = df_converts_bytes_to_int(df_no_bytes) + + pd.testing.assert_frame_equal(result_no_bytes, df_no_bytes) + + +def test_df_clean_column(): + df_dirty = pd.DataFrame( + { + "A": ["Hello\tWorld", "Goodbye\nWorld"], + "B": ["Keep\nIt\tClean", "Just\tTest"], + } + ) + + cleaned_df = df_clean_column(df_dirty, columns_to_clean=["A"]) + + expected_cleaned_df = pd.DataFrame( + {"A": ["HelloWorld", "GoodbyeWorld"], "B": ["Keep\nIt\tClean", "Just\tTest"]} + ) + + pd.testing.assert_frame_equal(cleaned_df, expected_cleaned_df) + + cleaned_all_df = df_clean_column(df_dirty) + + expected_all_cleaned_df = pd.DataFrame( + {"A": ["HelloWorld", "GoodbyeWorld"], "B": ["KeepItClean", "JustTest"]} + ) + + pd.testing.assert_frame_equal(cleaned_all_df, expected_all_cleaned_df) From 573f988f9d050789a3b149f4529bf4440256faf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zawadzki?= Date: Fri, 4 Oct 2024 12:29:39 +0200 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=93=9D=20Add=20missing=20reference=20?= =?UTF-8?q?docs=20(#1080)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🔥 Remove duplicate import * ♻️ Clean up the imports mess * 📝 Add missing reference docs & sort * 🐛 Fix import shadowing 3rd party lib --- .prettierignore | 3 +- .../references/orchestration/prefect/flows.md | 52 ++++++++++++------- .../references/orchestration/prefect/tasks.md | 48 ++++++++++++----- docs/references/sources/api.md | 28 ++++++---- .../sources/{sql.md => database.md} | 26 +++++----- docs/references/sources/other.md | 3 ++ mkdocs.yml | 5 +- .../orchestration/prefect/tasks/__init__.py | 1 - src/viadot/sources/__init__.py | 17 +++--- src/viadot/sources/{minio.py => _minio.py} | 0 tests/unit/test_supermetrics.py | 3 +- 11 files changed, 119 insertions(+), 67 deletions(-) rename docs/references/sources/{sql.md => database.md} (90%) create mode 100644 docs/references/sources/other.md rename src/viadot/sources/{minio.py => _minio.py} (100%) diff --git a/.prettierignore b/.prettierignore index 3845d6cca..f59391e92 100644 --- a/.prettierignore +++ b/.prettierignore @@ -1 +1,2 @@ -docs/references/sources/sql.md +docs/references/sources/database.md +docs/references/sources/api.md diff --git a/docs/references/orchestration/prefect/flows.md b/docs/references/orchestration/prefect/flows.md index 0ccc75105..c73f976f4 100644 --- a/docs/references/orchestration/prefect/flows.md +++ b/docs/references/orchestration/prefect/flows.md @@ -1,24 +1,12 @@ -::: viadot.orchestration.prefect.flows.cloud_for_customers_to_adls - -::: viadot.orchestration.prefect.flows.cloud_for_customers_to_databricks - -::: viadot.orchestration.prefect.flows.exchange_rates_to_adls - -::: viadot.orchestration.prefect.flows.exchange_rates_to_databricks - -::: viadot.orchestration.prefect.flows.sap_to_redshift_spectrum - -::: viadot.orchestration.prefect.flows.sharepoint_to_adls - -::: viadot.orchestration.prefect.flows.sharepoint_to_databricks +::: viadot.orchestration.prefect.flows.azure_sql_to_adls -::: viadot.orchestration.prefect.flows.sharepoint_to_redshift_spectrum +::: viadot.orchestration.prefect.flows.bigquery_to_adls -::: viadot.orchestration.prefect.flows.sharepoint_to_s3 +::: viadot.orchestration.prefect.flows.cloud_for_customers_to_adls -::: viadot.orchestration.prefect.flows.transform +::: viadot.orchestration.prefect.flows.cloud_for_customers_to_databricks -::: viadot.orchestration.prefect.flows.transform_and_catalog +::: viadot.orchestration.prefect.flows.customer_gauge_to_adls ::: viadot.orchestration.prefect.flows.duckdb_to_parquet @@ -28,20 +16,48 @@ ::: viadot.orchestration.prefect.flows.epicor_to_parquet +::: viadot.orchestration.prefect.flows.exchange_rates_to_adls + +::: viadot.orchestration.prefect.flows.exchange_rates_to_databricks + ::: viadot.orchestration.prefect.flows.exchange_rates_api_to_redshift_spectrum ::: viadot.orchestration.prefect.flows.genesys_to_adls ::: viadot.orchestration.prefect.flows.hubspot_to_adls +::: viadot.orchestration.prefect.flows.mediatool_to_adls + ::: viadot.orchestration.prefect.flows.mindful_to_adls ::: viadot.orchestration.prefect.flows.outlook_to_adls +::: viadot.orchestration.prefect.flows.salesforce_to_adls + +::: viadot.orchestration.prefect.flows.sap_bw_to_adls + ::: viadot.orchestration.prefect.flows.sap_to_parquet +::: viadot.orchestration.prefect.flows.sap_to_redshift_spectrum + +::: viadot.orchestration.prefect.flows.sftp_to_adls + +::: viadot.orchestration.prefect.flows.sharepoint_to_adls + +::: viadot.orchestration.prefect.flows.sharepoint_to_databricks + +::: viadot.orchestration.prefect.flows.sharepoint_to_redshift_spectrum + +::: viadot.orchestration.prefect.flows.sharepoint_to_s3 + ::: viadot.orchestration.prefect.flows.sql_server_to_minio ::: viadot.orchestration.prefect.flows.sql_server_to_parquet -::: viadot.orchestration.prefect.flows.azure_sql_to_adls +::: viadot.orchestration.prefect.flows.supermetrics_to_adls + +::: viadot.orchestration.prefect.flows.transform + +::: viadot.orchestration.prefect.flows.transform_and_catalog + +::: viadot.orchestration.prefect.flows.vid_club_to_adls diff --git a/docs/references/orchestration/prefect/tasks.md b/docs/references/orchestration/prefect/tasks.md index 002c8852f..1df0d5741 100644 --- a/docs/references/orchestration/prefect/tasks.md +++ b/docs/references/orchestration/prefect/tasks.md @@ -1,47 +1,67 @@ +::: viadot.orchestration.prefect.tasks.azure_sql_to_df + ::: viadot.orchestration.prefect.tasks.adls_upload -::: viadot.orchestration.prefect.tasks.df_to_adls +::: viadot.orchestration.prefect.tasks.bcp -::: viadot.orchestration.prefect.tasks.cloud_for_customers_to_df +::: viadot.orchestration.prefect.tasks.clone_repo -::: viadot.orchestration.prefect.tasks.df_to_databricks +::: viadot.orchestration.prefect.tasks.bigquery_to_df -::: viadot.orchestration.prefect.tasks.dbt_task +::: viadot.orchestration.prefect.tasks.cloud_for_customers_to_df -::: viadot.orchestration.prefect.tasks.exchange_rates_to_df +::: viadot.orchestration.prefect.tasks.create_sql_server_table -::: viadot.orchestration.prefect.tasks.clone_repo +::: viadot.orchestration.prefect.tasks.customer_gauge_to_df -::: viadot.orchestration.prefect.tasks.luma_ingest_task +::: viadot.orchestration.prefect.tasks.dbt_task -::: viadot.orchestration.prefect.tasks.df_to_redshift_spectrum +::: viadot.orchestration.prefect.tasks.df_to_adls -::: viadot.orchestration.prefect.tasks.s3_upload_file +::: viadot.orchestration.prefect.tasks.df_to_databricks -::: viadot.orchestration.prefect.tasks.sharepoint_download_file +::: viadot.orchestration.prefect.tasks.df_to_minio -::: viadot.orchestration.prefect.tasks.sharepoint_to_df +::: viadot.orchestration.prefect.tasks.df_to_redshift_spectrum ::: viadot.orchestration.prefect.tasks.duckdb_query ::: viadot.orchestration.prefect.tasks.epicor_to_df +::: viadot.orchestration.prefect.tasks.exchange_rates_to_df + ::: viadot.orchestration.prefect.tasks.genesys_to_df ::: viadot.orchestration.prefect.tasks.hubspot_to_df +::: viadot.orchestration.prefect.tasks.luma_ingest_task + +::: viadot.orchestration.prefect.tasks.mediatool_to_df + ::: viadot.orchestration.prefect.tasks.mindful_to_df ::: viadot.orchestration.prefect.tasks.outlook_to_df +::: viadot.orchestration.prefect.tasks.s3_upload_file + +::: viadot.orchestration.prefect.tasks.salesforce_to_df + +::: viadot.orchestration.prefect.tasks.sap_bw_to_df + ::: viadot.orchestration.prefect.tasks.sap_rfc_to_df -::: viadot.orchestration.prefect.tasks.bcp +::: viadot.orchestration.prefect.tasks.sftp_list -::: viadot.orchestration.prefect.tasks.create_sql_server_table +::: viadot.orchestration.prefect.tasks.sftp_to_df + +::: viadot.orchestration.prefect.tasks.sharepoint_download_file + +::: viadot.orchestration.prefect.tasks.sharepoint_to_df ::: viadot.orchestration.prefect.tasks.sql_server_query ::: viadot.orchestration.prefect.tasks.sql_server_to_df -::: viadot.orchestration.prefect.tasks.azure_sql_to_df +::: viadot.orchestration.prefect.tasks.vid_club_to_df + +::: viadot.orchestration.prefect.tasks.supermetrics_to_df diff --git a/docs/references/sources/api.md b/docs/references/sources/api.md index f106b3663..5a0ecc866 100644 --- a/docs/references/sources/api.md +++ b/docs/references/sources/api.md @@ -1,23 +1,33 @@ -# API Sources +# API sources -::: viadot.sources.uk_carbon_intensity.UKCarbonIntensity +::: viadot.sources.bigquery.BigQuery ::: viadot.sources.cloud_for_customers.CloudForCustomers -::: viadot.sources.exchange_rates.ExchangeRates +::: viadot.sources.customer_gauge.CustomerGauge -::: viadot.sources.cloud_for_customers.CloudForCustomers +::: viadot.sources.epicor.Epicor -::: viadot.sources.sharepoint.Sharepoint +::: viadot.sources.exchange_rates.ExchangeRates ::: viadot.sources.genesys.Genesys -::: viadot.sources.outlook.Outlook - ::: viadot.sources.hubspot.Hubspot -::: viadot.sources.epicor.Epicor +::: viadot.sources.mediatool.Mediatool ::: viadot.sources.mindful.Mindful -::: viadot.sources.minio.MinIO +::: viadot.sources._minio.MinIO + +::: viadot.sources.outlook.Outlook + +::: viadot.sources.salesforce.Salesforce + +::: viadot.sources.sharepoint.Sharepoint + +::: viadot.sources.supermetrics.Supermetrics + +::: viadot.sources.uk_carbon_intensity.UKCarbonIntensity + +::: viadot.sources.vid_club.VidClub diff --git a/docs/references/sources/sql.md b/docs/references/sources/database.md similarity index 90% rename from docs/references/sources/sql.md rename to docs/references/sources/database.md index 114bf97a2..732cdd2e6 100644 --- a/docs/references/sources/sql.md +++ b/docs/references/sources/database.md @@ -1,27 +1,29 @@ -# SQL Sources +# Database sources -::: viadot.sources.base.Source +::: viadot.sources.azure_data_lake.AzureDataLake -::: viadot.sources.base.SQL +::: viadot.sources.azure_sql.AzureSQL -::: viadot.sources.azure_data_lake.AzureDataLake +::: viadot.sources.databricks.Databricks + +::: viadot.sources._duckdb.DuckDB ::: viadot.sources.redshift_spectrum.RedshiftSpectrum ::: viadot.sources.s3.S3 -::: viadot.sources.sqlite.SQLite +::: viadot.sources.sap_bw.SAPBW -::: viadot.sources.sql_server.SQLServer +::: viadot.sources.sap_rfc.SAPRFC -::: viadot.sources.databricks.Databricks +::: viadot.sources.sap_rfc.SAPRFCV2 -::: viadot.sources._trino.Trino +::: viadot.sources.base.Source -::: viadot.sources._duckdb.DuckDB +::: viadot.sources.base.SQL -::: viadot.sources.sap_rfc.SAPRFC +::: viadot.sources.sqlite.SQLite -::: viadot.sources.sap_rfc.SAPRFCV2 +::: viadot.sources.sql_server.SQLServer -::: viadot.sources.azure_sql.AzureSQL +::: viadot.sources._trino.Trino diff --git a/docs/references/sources/other.md b/docs/references/sources/other.md new file mode 100644 index 000000000..2b6bc909e --- /dev/null +++ b/docs/references/sources/other.md @@ -0,0 +1,3 @@ +# Other sources + +::: viadot.sources.sftp.Sftp diff --git a/mkdocs.yml b/mkdocs.yml index caad33a62..628be00c3 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -21,8 +21,9 @@ nav: - References: - Sources: - - SQL Sources: references/sources/sql.md - - API Sources: references/sources/api.md + - Database: references/sources/database.md + - API: references/sources/api.md + - Other: references/sources/other.md - Orchestration: - Prefect: - Tasks: references/orchestration/prefect/tasks.md diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index 7250c17f4..f9798de4d 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -43,7 +43,6 @@ "dbt_task", "df_to_adls", "df_to_databricks", - "df_to_databricks", "df_to_minio", "df_to_redshift_spectrum", "duckdb_query", diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index c8501d90d..8c2fcc409 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -19,7 +19,8 @@ from .sftp import Sftp from .sharepoint import Sharepoint from .sql_server import SQLServer -from .supermetrics import Supermetrics, SupermetricsCredentials +from .sqlite import SQLite +from .supermetrics import Supermetrics from .uk_carbon_intensity import UKCarbonIntensity from .vid_club import VidClub @@ -29,6 +30,7 @@ "BigQuery", "CloudForCustomers", "CustomerGauge", + "DuckDB", "Epicor", "ExchangeRates", "Genesys", @@ -36,36 +38,33 @@ "Mediatool", "Mindful", "Outlook", + "SQLite", "SQLServer", "Salesforce", "Sftp", "Sharepoint", "Supermetrics", - "SupermetricsCredentials", # pragma: allowlist-secret "Trino", "UKCarbonIntensity", "VidClub", ] + if find_spec("adlfs"): from viadot.sources.azure_data_lake import AzureDataLake # noqa: F401 __all__.extend(["AzureDataLake"]) -if find_spec("duckdb"): - from viadot.sources._duckdb import DuckDB # noqa: F401 - __all__.extend(["DuckDB"]) if find_spec("redshift_connector"): from viadot.sources.redshift_spectrum import RedshiftSpectrum # noqa: F401 __all__.extend(["RedshiftSpectrum"]) + if find_spec("s3fs"): + from viadot.sources._minio import MinIO # noqa: F401 from viadot.sources.s3 import S3 # noqa: F401 - __all__.extend(["S3"]) -if find_spec("s3fs"): - from viadot.sources.minio import MinIO # noqa: F401 + __all__.extend(["S3", "MinIO"]) - __all__.extend(["MinIO"]) if find_spec("pyrfc"): from viadot.sources.sap_bw import SAPBW # noqa: F401 from viadot.sources.sap_rfc import SAPRFC, SAPRFCV2 # noqa: F401 diff --git a/src/viadot/sources/minio.py b/src/viadot/sources/_minio.py similarity index 100% rename from src/viadot/sources/minio.py rename to src/viadot/sources/_minio.py diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py index 0cfa48cd8..dcbd6e5a7 100644 --- a/tests/unit/test_supermetrics.py +++ b/tests/unit/test_supermetrics.py @@ -1,6 +1,7 @@ import pytest -from viadot.sources import Supermetrics, SupermetricsCredentials +from viadot.sources import Supermetrics +from viadot.sources.supermetrics import SupermetricsCredentials @pytest.fixture