diff --git a/docs/references/orchestration/prefect/flows.md b/docs/references/orchestration/prefect/flows.md index c73f976f4..062250991 100644 --- a/docs/references/orchestration/prefect/flows.md +++ b/docs/references/orchestration/prefect/flows.md @@ -2,6 +2,8 @@ ::: viadot.orchestration.prefect.flows.bigquery_to_adls +::: viadot.orchestration.prefect.flows.business_core_to_parquet + ::: viadot.orchestration.prefect.flows.cloud_for_customers_to_adls ::: viadot.orchestration.prefect.flows.cloud_for_customers_to_databricks diff --git a/docs/references/orchestration/prefect/tasks.md b/docs/references/orchestration/prefect/tasks.md index 1df0d5741..ab12a86e1 100644 --- a/docs/references/orchestration/prefect/tasks.md +++ b/docs/references/orchestration/prefect/tasks.md @@ -8,6 +8,8 @@ ::: viadot.orchestration.prefect.tasks.bigquery_to_df +::: viadot.orchestration.prefect.tasks.business_core.business_core_to_df + ::: viadot.orchestration.prefect.tasks.cloud_for_customers_to_df ::: viadot.orchestration.prefect.tasks.create_sql_server_table diff --git a/docs/references/sources/api.md b/docs/references/sources/api.md index 5a0ecc866..306fec20c 100644 --- a/docs/references/sources/api.md +++ b/docs/references/sources/api.md @@ -2,6 +2,8 @@ ::: viadot.sources.bigquery.BigQuery +::: viadot.sources.business_core.BusinessCore + ::: viadot.sources.cloud_for_customers.CloudForCustomers ::: viadot.sources.customer_gauge.CustomerGauge diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index fce71f78f..b6e7fc292 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -2,6 +2,7 @@ from .azure_sql_to_adls import azure_sql_to_adls from .bigquery_to_adls import bigquery_to_adls +from .business_core_to_parquet import business_core_to_parquet from .cloud_for_customers_to_adls import cloud_for_customers_to_adls from .cloud_for_customers_to_databricks import cloud_for_customers_to_databricks from .customer_gauge_to_adls import customer_gauge_to_adls @@ -37,6 +38,7 @@ __all__ = [ "azure_sql_to_adls", "bigquery_to_adls", + "business_core_to_parquet", "cloud_for_customers_to_adls", "cloud_for_customers_to_databricks", "customer_gauge_to_adls", diff --git a/src/viadot/orchestration/prefect/flows/business_core_to_parquet.py b/src/viadot/orchestration/prefect/flows/business_core_to_parquet.py new file mode 100644 index 000000000..8ddc37669 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/business_core_to_parquet.py @@ -0,0 +1,61 @@ +"""Flow for downloading data from Business Core API to a Parquet file.""" + +from typing import Any, Literal + +from prefect import flow + +from viadot.orchestration.prefect.tasks.business_core import business_core_to_df +from viadot.orchestration.prefect.tasks.task_utils import df_to_parquet + + +@flow( + name="extract--businesscore--parquet", + description="Extract data from Business Core API and load it into Parquet file", + retries=1, + retry_delay_seconds=60, +) +def business_core_to_parquet( + path: str | None = None, + url: str | None = None, + filters: dict[str, Any] | None = None, + credentials_secret: str | None = None, + config_key: str | None = None, + if_empty: str = "skip", + if_exists: Literal["append", "replace", "skip"] = "replace", + verify: bool = True, +) -> None: + """Download data from Business Core API to a Parquet file. + + Args: + path (str, required): Path where to save the Parquet file. Defaults to None. + url (str, required): Base url to the view in Business Core API. + Defaults to None. + filters (dict[str, Any], optional): Filters in form of dictionary. + Available filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'. + Defaults to None. + credentials_secret (str, optional): The name of the secret that stores Business + Core credentials. Defaults to None. + More info on: https://docs.prefect.io/concepts/blocks/ + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + if_empty (str, optional): What to do if output DataFrame is empty. + Defaults to "skip". + if_exists (Literal["append", "replace", "skip"], optional): + What to do if the table exists. Defaults to "replace". + verify (bool, optional): Whether or not verify certificates while + connecting to an API. Defaults to True. + """ + df = business_core_to_df( + url=url, + path=path, + credentials_secret=credentials_secret, + config_key=config_key, + filters=filters, + if_empty=if_empty, + verify=verify, + ) + return df_to_parquet( + df=df, + path=path, + if_exists=if_exists, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index f9798de4d..c895d12af 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -4,6 +4,7 @@ from .azure_sql import azure_sql_to_df from .bcp import bcp from .bigquery import bigquery_to_df +from .business_core import business_core_to_df from .cloud_for_customers import cloud_for_customers_to_df from .customer_gauge_to_df import customer_gauge_to_df from .databricks import df_to_databricks @@ -37,6 +38,7 @@ "bcp", "clone_repo", "bigquery_to_df", + "business_core_to_df", "cloud_for_customers_to_df", "create_sql_server_table", "customer_gauge_to_df", diff --git a/src/viadot/orchestration/prefect/tasks/business_core.py b/src/viadot/orchestration/prefect/tasks/business_core.py new file mode 100644 index 000000000..4998daf84 --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/business_core.py @@ -0,0 +1,70 @@ +"""Task for downloading data from Business Core API to a Parquet file.""" + +from typing import Any + +from pandas import DataFrame +from prefect import task +from prefect.logging import get_run_logger + +from viadot.config import get_source_credentials +from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from viadot.orchestration.prefect.utils import get_credentials +from viadot.sources.business_core import BusinessCore + + +@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3) +def business_core_to_df( + path: str | None = None, + url: str | None = None, + filters: dict[str, Any] | None = None, + credentials_secret: str | None = None, + config_key: str | None = None, + if_empty: str = "skip", + verify: bool = True, +) -> DataFrame: + """Download data from Business Core API to a Parquet file. + + Args: + path (str, required): Path where to save the Parquet file. Defaults to None. + url (str, required): Base url to the view in Business Core API. Defaults to + None. + filters (dict[str, Any], optional): Filters in form of dictionary. Available + filters: 'BucketCount','BucketNo', 'FromDate', 'ToDate'. Defaults to None. + credentials_secret (str, optional): The name of the secret that stores Business + Core credentials. More info on: https://docs.prefect.io/concepts/blocks/. + Defaults to None. + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + if_empty (str, optional): What to do if output DataFrame is empty. Defaults to + "skip". + verify (bool, optional): Whether or not verify certificates while connecting + to an API. Defaults to True. + """ + if not (credentials_secret or config_key): + raise MissingSourceCredentialsError + + logger = get_run_logger() + + credentials = get_source_credentials(config_key) or get_credentials( + credentials_secret + ) + + bc = BusinessCore( + url=url, + path=path, + credentials=credentials, + config_key=config_key, + filters=filters, + verify=verify, + ) + + df = bc.to_df(if_empty=if_empty) + + nrows = df.shape[0] + ncols = df.shape[1] + + logger.info( + f"Successfully downloaded {nrows} rows and {ncols} columns of data to a DataFrame." + ) + + return df diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 8c2fcc409..f3d2a792a 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -6,6 +6,7 @@ from ._trino import Trino from .azure_sql import AzureSQL from .bigquery import BigQuery +from .business_core import BusinessCore from .cloud_for_customers import CloudForCustomers from .customer_gauge import CustomerGauge from .epicor import Epicor @@ -28,6 +29,7 @@ __all__ = [ "AzureSQL", "BigQuery", + "BusinessCore", "CloudForCustomers", "CustomerGauge", "DuckDB", diff --git a/src/viadot/sources/business_core.py b/src/viadot/sources/business_core.py new file mode 100644 index 000000000..d8a64847f --- /dev/null +++ b/src/viadot/sources/business_core.py @@ -0,0 +1,164 @@ +"""Source for connecting to Business Core API.""" + +import json +from typing import Any, Literal + +import pandas as pd +from pydantic import BaseModel, SecretStr + +from viadot.config import get_source_credentials +from viadot.exceptions import APIError +from viadot.sources.base import Source +from viadot.utils import add_viadot_metadata_columns, handle_api_response + + +class BusinessCoreCredentials(BaseModel): + """Business Core credentials. + + Uses simple authentication: + - username: The user name to use. + - password: The password to use. + """ + + username: str + password: SecretStr + + +class BusinessCore(Source): + """Business Core ERP API connector.""" + + def __init__( + self, + url: str | None = None, + filters: dict[str, Any] | None = None, + credentials: dict[str, Any] | None = None, + config_key: str = "BusinessCore", + verify: bool = True, + *args, + **kwargs, + ): + """Create a BusinessCore connector instance. + + Args: + url (str, optional): Base url to a view in Business Core API. + Defaults to None. + filters (dict[str, Any], optional): Filters in form of dictionary. Available + filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'. Defaults to + None. + credentials (dict[str, Any], optional): Credentials stored in a dictionary. + Required credentials: username, password. Defaults to None. + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to "BusinessCore". + verify (bool, optional): Whether or not verify certificates while connecting + to an API. Defaults to True. + """ + raw_creds = credentials or get_source_credentials(config_key) + validated_creds = dict(BusinessCoreCredentials(**raw_creds)) + + self.url = url + self.filters = self._clean_filters(filters) + self.verify = verify + + super().__init__(*args, credentials=validated_creds, **kwargs) + + def generate_token(self) -> str: + """Generate a token for the user. + + Returns: + string: The token. + """ + url = "https://api.businesscore.ae/api/user/Login" + + username = self.credentials.get("username") + password = self.credentials.get("password").get_secret_value() + payload = f"grant_type=password&username={username}&password={password}&scope=" + headers = {"Content-Type": "application/x-www-form-urlencoded"} + response = handle_api_response( + url=url, + headers=headers, + method="GET", + data=payload, + verify=self.verify, + ) + + return json.loads(response.text).get("access_token") + + @staticmethod + def _clean_filters(filters: dict[str, str | None]) -> dict[str, str]: + """Replace 'None' with '&' in a dictionary. + + Required for payload in 'x-www-form-urlencoded' from. + + Returns: + dict[str, str]: Dictionary with filters prepared for further use. + """ + return {key: ("&" if val is None else val) for key, val in filters.items()} + + def get_data(self) -> dict[str, Any]: + """Obtain data from Business Core API. + + Returns: + dict: Dictionary with data downloaded from Business Core API. + """ + view = self.url.split("/")[-1] + + if view not in [ + "GetCustomerData", + "GetItemMaster", + "GetPendingSalesOrderData", + "GetSalesInvoiceData", + "GetSalesReturnDetailData", + "GetSalesOrderData", + "GetSalesQuotationData", + ]: + error_message = f"View {view} currently not available." + raise APIError(error_message) + + payload = ( + "BucketCount=" + + str(self.filters.get("BucketCount")) + + "BucketNo=" + + str(self.filters.get("BucketNo")) + + "FromDate=" + + str(self.filters.get("FromDate")) + + "ToDate" + + str(self.filters.get("ToDate")) + ) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": "Bearer " + self.generate_token(), + } + self.logger.info("Downloading the data...") + response = handle_api_response( + url=self.url, + headers=headers, + method="GET", + data=payload, + verify=self.verify, + ) + self.logger.info("Data was downloaded successfully.") + return json.loads(response.text).get("MasterDataList") + + @add_viadot_metadata_columns + def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFrame: + """Download data into a pandas DataFrame. + + Args: + if_empty (Literal["warn", "fail", "skip"], optional): What to do if output + DataFrame is empty. Defaults to "skip". + + Returns: + pd.DataFrame: DataFrame with the data. + + Raises: + APIError: When selected API view is not available. + """ + data = self.get_data() + df = pd.DataFrame.from_dict(data) + self.logger.info( + f"Data was successfully transformed into DataFrame: {len(df.columns)} columns and {len(df)} rows." + ) + if df.empty: + self._handle_if_empty(if_empty) + + return df diff --git a/src/viadot/utils.py b/src/viadot/utils.py index ec581d9cd..0015a1332 100644 --- a/src/viadot/utils.py +++ b/src/viadot/utils.py @@ -11,16 +11,16 @@ import pandas as pd import pyodbc -import requests # type: ignore +import requests from requests.adapters import HTTPAdapter -from requests.exceptions import ( # type: ignore +from requests.exceptions import ( ConnectionError, HTTPError, ReadTimeout, Timeout, ) from requests.packages.urllib3.util.retry import Retry -from urllib3.exceptions import ProtocolError # type: ignore +from urllib3.exceptions import ProtocolError from viadot.exceptions import APIError, ValidationError from viadot.signals import SKIP @@ -31,7 +31,7 @@ with contextlib.suppress(ImportError): - import pyspark.sql.dataframe as spark # type: ignore + import pyspark.sql.dataframe as spark def slugify(name: str) -> str: @@ -47,6 +47,7 @@ def handle_api_request( timeout: tuple = (3.05, 60 * 30), method: Literal["GET", "POST", "DELETE"] = "GET", data: str | None = None, + verify: bool = True, ) -> requests.Response: """Send an HTTP request to the specified URL using the provided parameters. @@ -63,6 +64,7 @@ def handle_api_request( method (Literal["GET", "POST", "DELETE"], optional): The HTTP method to use for the request. Defaults to "GET". data (str, optional): The request body data as a string. Defaults to None. + verify (bool, optional): Whether to verify certificates. Defaults to True. Returns: requests.Response: The HTTP response object. @@ -87,6 +89,7 @@ def handle_api_request( headers=headers, timeout=timeout, data=data, + verify=verify, ) @@ -126,6 +129,7 @@ def handle_api_response( timeout: tuple = (3.05, 60 * 30), method: Literal["GET", "POST", "DELETE"] = "GET", data: str | None = None, + verify: bool = True, ) -> requests.models.Response: """Handle an HTTP response. @@ -142,12 +146,13 @@ def handle_api_response( timeout (tuple, optional): A tuple of (connect_timeout, read_timeout) in seconds. Defaults to (3.05, 60 * 30). data (str, optional): The request body data as a string. Defaults to None. + verify (bool, optional): Whether to verify certificates. Defaults to True. Raises: - ReadTimeout: Stop waiting for a response after `timeout` seconds. - HTTPError: The raised HTTP error. - ConnectionError: Raised when the client is unable to connect to the server. - APIError: viadot's generic API error. + - ConnectionError: If the connection timed out, as specified in `timeout`. + - ReadTimeout: If the read timed out, as specified in `timeout`. + - HTTPError: If the API raised an HTTPError. + - APIError: Generic API error. Returns: requests.models.Response @@ -160,6 +165,7 @@ def handle_api_response( timeout=timeout, method=method, data=data, + verify=verify, ) return _handle_response(response) diff --git a/tests/unit/test_business_core.py b/tests/unit/test_business_core.py new file mode 100644 index 000000000..80df7117b --- /dev/null +++ b/tests/unit/test_business_core.py @@ -0,0 +1,61 @@ +from unittest.mock import Mock, patch + +import pandas as pd +import pytest + +from viadot.sources.business_core import BusinessCore + + +@pytest.fixture(scope="module") +def business_core(): + return BusinessCore( + url="https://api.businesscore.ae/api/GetCustomerData", + filters={ + "BucketCount": 10, + "BucketNo": 1, + "FromDate": None, + "ToDate": None, + }, + credentials={ + "username": "test", + "password": "test123", # pragma: allowlist secret + }, + ) + + +@patch("viadot.sources.business_core.handle_api_response") +def test_generate_token(mock_api_response, business_core): + mock_api_response.return_value = Mock(text='{"access_token": "12345"}') + token = business_core.generate_token() + t = "12345" + assert token == t + + +def test__clean_filters(business_core): + filters_raw = { + "BucketCount": 10, + "BucketNo": 1, + "FromDate": None, + "ToDate": None, + } + filters_clean = business_core._clean_filters(filters_raw) + assert filters_clean == { + "BucketCount": 10, + "BucketNo": 1, + "FromDate": "&", + "ToDate": "&", + } + + +def test_to_df(business_core): + with patch.object( + business_core, + "get_data", + return_value=[{"id": 1, "name": "John Doe"}], + ): + df = business_core.to_df() + assert isinstance(df, pd.DataFrame) + assert len(df.columns) == 4 + assert len(df) == 1 + assert df["id"].tolist() == [1] + assert df["name"].tolist() == ["John Doe"] diff --git a/tests/unit/test_customer_gauge.py b/tests/unit/test_customer_gauge.py index b1e7976fe..4a823f51f 100644 --- a/tests/unit/test_customer_gauge.py +++ b/tests/unit/test_customer_gauge.py @@ -2,7 +2,7 @@ from unittest.mock import MagicMock, patch -import pytest # type: ignore +import pytest from viadot.exceptions import APIError from viadot.sources.customer_gauge import CustomerGauge, CustomerGaugeCredentials