-
Notifications
You must be signed in to change notification settings - Fork 40
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
✨ Add
BusinessCore
source and Prefect tasks (#1051)
* 🚀 Added BusinessCore source * 🚀 Added business_core_to_df task * 🚀 Added business_core_to_parquet flow * ⚡️ added path parameter to task * ⚡️ changed credentials to credentials_secret * ⚡️ Added verify parameter * 🎨 added business_core to inits * ✨ Added BusinessCoreCredentials * ⚡️ Changed default parameters * ✅ added test for source, task and flow * ⚡️ Changed loggers * 🎨 Formatted code * 🎨 Formatted code * 🎨 Formatted code * 🐛 Fixed import error * ✅ Modified imports * ✨ Added `# pragma: allowlist secret` * 🎨 Improved code structure * 🐛 Fix docstrings * 📝 Add reference docs * 🐛 Fix doc import * ♻️ Minor refactor * ♻️ Standardize stuff --------- Co-authored-by: rziemianek <[email protected]> Co-authored-by: trymzet <[email protected]>
- Loading branch information
1 parent
573f988
commit df931e2
Showing
12 changed files
with
383 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
src/viadot/orchestration/prefect/flows/business_core_to_parquet.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
"""Flow for downloading data from Business Core API to a Parquet file.""" | ||
|
||
from typing import Any, Literal | ||
|
||
from prefect import flow | ||
|
||
from viadot.orchestration.prefect.tasks.business_core import business_core_to_df | ||
from viadot.orchestration.prefect.tasks.task_utils import df_to_parquet | ||
|
||
|
||
@flow( | ||
name="extract--businesscore--parquet", | ||
description="Extract data from Business Core API and load it into Parquet file", | ||
retries=1, | ||
retry_delay_seconds=60, | ||
) | ||
def business_core_to_parquet( | ||
path: str | None = None, | ||
url: str | None = None, | ||
filters: dict[str, Any] | None = None, | ||
credentials_secret: str | None = None, | ||
config_key: str | None = None, | ||
if_empty: str = "skip", | ||
if_exists: Literal["append", "replace", "skip"] = "replace", | ||
verify: bool = True, | ||
) -> None: | ||
"""Download data from Business Core API to a Parquet file. | ||
Args: | ||
path (str, required): Path where to save the Parquet file. Defaults to None. | ||
url (str, required): Base url to the view in Business Core API. | ||
Defaults to None. | ||
filters (dict[str, Any], optional): Filters in form of dictionary. | ||
Available filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'. | ||
Defaults to None. | ||
credentials_secret (str, optional): The name of the secret that stores Business | ||
Core credentials. Defaults to None. | ||
More info on: https://docs.prefect.io/concepts/blocks/ | ||
config_key (str, optional): The key in the viadot config holding relevant | ||
credentials. Defaults to None. | ||
if_empty (str, optional): What to do if output DataFrame is empty. | ||
Defaults to "skip". | ||
if_exists (Literal["append", "replace", "skip"], optional): | ||
What to do if the table exists. Defaults to "replace". | ||
verify (bool, optional): Whether or not verify certificates while | ||
connecting to an API. Defaults to True. | ||
""" | ||
df = business_core_to_df( | ||
url=url, | ||
path=path, | ||
credentials_secret=credentials_secret, | ||
config_key=config_key, | ||
filters=filters, | ||
if_empty=if_empty, | ||
verify=verify, | ||
) | ||
return df_to_parquet( | ||
df=df, | ||
path=path, | ||
if_exists=if_exists, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
"""Task for downloading data from Business Core API to a Parquet file.""" | ||
|
||
from typing import Any | ||
|
||
from pandas import DataFrame | ||
from prefect import task | ||
from prefect.logging import get_run_logger | ||
|
||
from viadot.config import get_source_credentials | ||
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError | ||
from viadot.orchestration.prefect.utils import get_credentials | ||
from viadot.sources.business_core import BusinessCore | ||
|
||
|
||
@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3) | ||
def business_core_to_df( | ||
path: str | None = None, | ||
url: str | None = None, | ||
filters: dict[str, Any] | None = None, | ||
credentials_secret: str | None = None, | ||
config_key: str | None = None, | ||
if_empty: str = "skip", | ||
verify: bool = True, | ||
) -> DataFrame: | ||
"""Download data from Business Core API to a Parquet file. | ||
Args: | ||
path (str, required): Path where to save the Parquet file. Defaults to None. | ||
url (str, required): Base url to the view in Business Core API. Defaults to | ||
None. | ||
filters (dict[str, Any], optional): Filters in form of dictionary. Available | ||
filters: 'BucketCount','BucketNo', 'FromDate', 'ToDate'. Defaults to None. | ||
credentials_secret (str, optional): The name of the secret that stores Business | ||
Core credentials. More info on: https://docs.prefect.io/concepts/blocks/. | ||
Defaults to None. | ||
config_key (str, optional): The key in the viadot config holding relevant | ||
credentials. Defaults to None. | ||
if_empty (str, optional): What to do if output DataFrame is empty. Defaults to | ||
"skip". | ||
verify (bool, optional): Whether or not verify certificates while connecting | ||
to an API. Defaults to True. | ||
""" | ||
if not (credentials_secret or config_key): | ||
raise MissingSourceCredentialsError | ||
|
||
logger = get_run_logger() | ||
|
||
credentials = get_source_credentials(config_key) or get_credentials( | ||
credentials_secret | ||
) | ||
|
||
bc = BusinessCore( | ||
url=url, | ||
path=path, | ||
credentials=credentials, | ||
config_key=config_key, | ||
filters=filters, | ||
verify=verify, | ||
) | ||
|
||
df = bc.to_df(if_empty=if_empty) | ||
|
||
nrows = df.shape[0] | ||
ncols = df.shape[1] | ||
|
||
logger.info( | ||
f"Successfully downloaded {nrows} rows and {ncols} columns of data to a DataFrame." | ||
) | ||
|
||
return df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
"""Source for connecting to Business Core API.""" | ||
|
||
import json | ||
from typing import Any, Literal | ||
|
||
import pandas as pd | ||
from pydantic import BaseModel, SecretStr | ||
|
||
from viadot.config import get_source_credentials | ||
from viadot.exceptions import APIError | ||
from viadot.sources.base import Source | ||
from viadot.utils import add_viadot_metadata_columns, handle_api_response | ||
|
||
|
||
class BusinessCoreCredentials(BaseModel): | ||
"""Business Core credentials. | ||
Uses simple authentication: | ||
- username: The user name to use. | ||
- password: The password to use. | ||
""" | ||
|
||
username: str | ||
password: SecretStr | ||
|
||
|
||
class BusinessCore(Source): | ||
"""Business Core ERP API connector.""" | ||
|
||
def __init__( | ||
self, | ||
url: str | None = None, | ||
filters: dict[str, Any] | None = None, | ||
credentials: dict[str, Any] | None = None, | ||
config_key: str = "BusinessCore", | ||
verify: bool = True, | ||
*args, | ||
**kwargs, | ||
): | ||
"""Create a BusinessCore connector instance. | ||
Args: | ||
url (str, optional): Base url to a view in Business Core API. | ||
Defaults to None. | ||
filters (dict[str, Any], optional): Filters in form of dictionary. Available | ||
filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'. Defaults to | ||
None. | ||
credentials (dict[str, Any], optional): Credentials stored in a dictionary. | ||
Required credentials: username, password. Defaults to None. | ||
config_key (str, optional): The key in the viadot config holding relevant | ||
credentials. Defaults to "BusinessCore". | ||
verify (bool, optional): Whether or not verify certificates while connecting | ||
to an API. Defaults to True. | ||
""" | ||
raw_creds = credentials or get_source_credentials(config_key) | ||
validated_creds = dict(BusinessCoreCredentials(**raw_creds)) | ||
|
||
self.url = url | ||
self.filters = self._clean_filters(filters) | ||
self.verify = verify | ||
|
||
super().__init__(*args, credentials=validated_creds, **kwargs) | ||
|
||
def generate_token(self) -> str: | ||
"""Generate a token for the user. | ||
Returns: | ||
string: The token. | ||
""" | ||
url = "https://api.businesscore.ae/api/user/Login" | ||
|
||
username = self.credentials.get("username") | ||
password = self.credentials.get("password").get_secret_value() | ||
payload = f"grant_type=password&username={username}&password={password}&scope=" | ||
headers = {"Content-Type": "application/x-www-form-urlencoded"} | ||
response = handle_api_response( | ||
url=url, | ||
headers=headers, | ||
method="GET", | ||
data=payload, | ||
verify=self.verify, | ||
) | ||
|
||
return json.loads(response.text).get("access_token") | ||
|
||
@staticmethod | ||
def _clean_filters(filters: dict[str, str | None]) -> dict[str, str]: | ||
"""Replace 'None' with '&' in a dictionary. | ||
Required for payload in 'x-www-form-urlencoded' from. | ||
Returns: | ||
dict[str, str]: Dictionary with filters prepared for further use. | ||
""" | ||
return {key: ("&" if val is None else val) for key, val in filters.items()} | ||
|
||
def get_data(self) -> dict[str, Any]: | ||
"""Obtain data from Business Core API. | ||
Returns: | ||
dict: Dictionary with data downloaded from Business Core API. | ||
""" | ||
view = self.url.split("/")[-1] | ||
|
||
if view not in [ | ||
"GetCustomerData", | ||
"GetItemMaster", | ||
"GetPendingSalesOrderData", | ||
"GetSalesInvoiceData", | ||
"GetSalesReturnDetailData", | ||
"GetSalesOrderData", | ||
"GetSalesQuotationData", | ||
]: | ||
error_message = f"View {view} currently not available." | ||
raise APIError(error_message) | ||
|
||
payload = ( | ||
"BucketCount=" | ||
+ str(self.filters.get("BucketCount")) | ||
+ "BucketNo=" | ||
+ str(self.filters.get("BucketNo")) | ||
+ "FromDate=" | ||
+ str(self.filters.get("FromDate")) | ||
+ "ToDate" | ||
+ str(self.filters.get("ToDate")) | ||
) | ||
headers = { | ||
"Content-Type": "application/x-www-form-urlencoded", | ||
"Authorization": "Bearer " + self.generate_token(), | ||
} | ||
self.logger.info("Downloading the data...") | ||
response = handle_api_response( | ||
url=self.url, | ||
headers=headers, | ||
method="GET", | ||
data=payload, | ||
verify=self.verify, | ||
) | ||
self.logger.info("Data was downloaded successfully.") | ||
return json.loads(response.text).get("MasterDataList") | ||
|
||
@add_viadot_metadata_columns | ||
def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFrame: | ||
"""Download data into a pandas DataFrame. | ||
Args: | ||
if_empty (Literal["warn", "fail", "skip"], optional): What to do if output | ||
DataFrame is empty. Defaults to "skip". | ||
Returns: | ||
pd.DataFrame: DataFrame with the data. | ||
Raises: | ||
APIError: When selected API view is not available. | ||
""" | ||
data = self.get_data() | ||
df = pd.DataFrame.from_dict(data) | ||
self.logger.info( | ||
f"Data was successfully transformed into DataFrame: {len(df.columns)} columns and {len(df)} rows." | ||
) | ||
if df.empty: | ||
self._handle_if_empty(if_empty) | ||
|
||
return df |
Oops, something went wrong.