Skip to content

Commit

Permalink
Merge branch '2.0' into eurostat_2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafalz13 authored Oct 4, 2024
2 parents 8b1cdce + d406f2f commit c3589a1
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 11 deletions.
2 changes: 2 additions & 0 deletions docs/references/orchestration/prefect/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

::: viadot.orchestration.prefect.flows.bigquery_to_adls

::: viadot.orchestration.prefect.flows.business_core_to_parquet

::: viadot.orchestration.prefect.flows.cloud_for_customers_to_adls

::: viadot.orchestration.prefect.flows.cloud_for_customers_to_databricks
Expand Down
2 changes: 2 additions & 0 deletions docs/references/orchestration/prefect/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

::: viadot.orchestration.prefect.tasks.bigquery_to_df

::: viadot.orchestration.prefect.tasks.business_core.business_core_to_df

::: viadot.orchestration.prefect.tasks.cloud_for_customers_to_df

::: viadot.orchestration.prefect.tasks.create_sql_server_table
Expand Down
2 changes: 2 additions & 0 deletions docs/references/sources/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

::: viadot.sources.bigquery.BigQuery

::: viadot.sources.business_core.BusinessCore

::: viadot.sources.cloud_for_customers.CloudForCustomers

::: viadot.sources.customer_gauge.CustomerGauge
Expand Down
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .azure_sql_to_adls import azure_sql_to_adls
from .bigquery_to_adls import bigquery_to_adls
from .business_core_to_parquet import business_core_to_parquet
from .cloud_for_customers_to_adls import cloud_for_customers_to_adls
from .cloud_for_customers_to_databricks import cloud_for_customers_to_databricks
from .customer_gauge_to_adls import customer_gauge_to_adls
Expand Down Expand Up @@ -38,6 +39,7 @@
__all__ = [
"azure_sql_to_adls",
"bigquery_to_adls",
"business_core_to_parquet",
"cloud_for_customers_to_adls",
"cloud_for_customers_to_databricks",
"customer_gauge_to_adls",
Expand Down
61 changes: 61 additions & 0 deletions src/viadot/orchestration/prefect/flows/business_core_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Flow for downloading data from Business Core API to a Parquet file."""

from typing import Any, Literal

from prefect import flow

from viadot.orchestration.prefect.tasks.business_core import business_core_to_df
from viadot.orchestration.prefect.tasks.task_utils import df_to_parquet


@flow(
name="extract--businesscore--parquet",
description="Extract data from Business Core API and load it into Parquet file",
retries=1,
retry_delay_seconds=60,
)
def business_core_to_parquet(
path: str | None = None,
url: str | None = None,
filters: dict[str, Any] | None = None,
credentials_secret: str | None = None,
config_key: str | None = None,
if_empty: str = "skip",
if_exists: Literal["append", "replace", "skip"] = "replace",
verify: bool = True,
) -> None:
"""Download data from Business Core API to a Parquet file.
Args:
path (str, required): Path where to save the Parquet file. Defaults to None.
url (str, required): Base url to the view in Business Core API.
Defaults to None.
filters (dict[str, Any], optional): Filters in form of dictionary.
Available filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'.
Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business
Core credentials. Defaults to None.
More info on: https://docs.prefect.io/concepts/blocks/
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty.
Defaults to "skip".
if_exists (Literal["append", "replace", "skip"], optional):
What to do if the table exists. Defaults to "replace".
verify (bool, optional): Whether or not verify certificates while
connecting to an API. Defaults to True.
"""
df = business_core_to_df(
url=url,
path=path,
credentials_secret=credentials_secret,
config_key=config_key,
filters=filters,
if_empty=if_empty,
verify=verify,
)
return df_to_parquet(
df=df,
path=path,
if_exists=if_exists,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .azure_sql import azure_sql_to_df
from .bcp import bcp
from .bigquery import bigquery_to_df
from .business_core import business_core_to_df
from .cloud_for_customers import cloud_for_customers_to_df
from .customer_gauge_to_df import customer_gauge_to_df
from .databricks import df_to_databricks
Expand Down Expand Up @@ -38,6 +39,7 @@
"bcp",
"clone_repo",
"bigquery_to_df",
"business_core_to_df",
"cloud_for_customers_to_df",
"create_sql_server_table",
"customer_gauge_to_df",
Expand Down
70 changes: 70 additions & 0 deletions src/viadot/orchestration/prefect/tasks/business_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Task for downloading data from Business Core API to a Parquet file."""

from typing import Any

from pandas import DataFrame
from prefect import task
from prefect.logging import get_run_logger

from viadot.config import get_source_credentials
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources.business_core import BusinessCore


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60 * 3)
def business_core_to_df(
path: str | None = None,
url: str | None = None,
filters: dict[str, Any] | None = None,
credentials_secret: str | None = None,
config_key: str | None = None,
if_empty: str = "skip",
verify: bool = True,
) -> DataFrame:
"""Download data from Business Core API to a Parquet file.
Args:
path (str, required): Path where to save the Parquet file. Defaults to None.
url (str, required): Base url to the view in Business Core API. Defaults to
None.
filters (dict[str, Any], optional): Filters in form of dictionary. Available
filters: 'BucketCount','BucketNo', 'FromDate', 'ToDate'. Defaults to None.
credentials_secret (str, optional): The name of the secret that stores Business
Core credentials. More info on: https://docs.prefect.io/concepts/blocks/.
Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
if_empty (str, optional): What to do if output DataFrame is empty. Defaults to
"skip".
verify (bool, optional): Whether or not verify certificates while connecting
to an API. Defaults to True.
"""
if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

logger = get_run_logger()

credentials = get_source_credentials(config_key) or get_credentials(
credentials_secret
)

bc = BusinessCore(
url=url,
path=path,
credentials=credentials,
config_key=config_key,
filters=filters,
verify=verify,
)

df = bc.to_df(if_empty=if_empty)

nrows = df.shape[0]
ncols = df.shape[1]

logger.info(
f"Successfully downloaded {nrows} rows and {ncols} columns of data to a DataFrame."
)

return df
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ._trino import Trino
from .azure_sql import AzureSQL
from .bigquery import BigQuery
from .business_core import BusinessCore
from .cloud_for_customers import CloudForCustomers
from .customer_gauge import CustomerGauge
from .epicor import Epicor
Expand All @@ -29,6 +30,7 @@
__all__ = [
"AzureSQL",
"BigQuery",
"BusinessCore",
"CloudForCustomers",
"CustomerGauge",
"DuckDB",
Expand Down
164 changes: 164 additions & 0 deletions src/viadot/sources/business_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""Source for connecting to Business Core API."""

import json
from typing import Any, Literal

import pandas as pd
from pydantic import BaseModel, SecretStr

from viadot.config import get_source_credentials
from viadot.exceptions import APIError
from viadot.sources.base import Source
from viadot.utils import add_viadot_metadata_columns, handle_api_response


class BusinessCoreCredentials(BaseModel):
"""Business Core credentials.
Uses simple authentication:
- username: The user name to use.
- password: The password to use.
"""

username: str
password: SecretStr


class BusinessCore(Source):
"""Business Core ERP API connector."""

def __init__(
self,
url: str | None = None,
filters: dict[str, Any] | None = None,
credentials: dict[str, Any] | None = None,
config_key: str = "BusinessCore",
verify: bool = True,
*args,
**kwargs,
):
"""Create a BusinessCore connector instance.
Args:
url (str, optional): Base url to a view in Business Core API.
Defaults to None.
filters (dict[str, Any], optional): Filters in form of dictionary. Available
filters: 'BucketCount', 'BucketNo', 'FromDate', 'ToDate'. Defaults to
None.
credentials (dict[str, Any], optional): Credentials stored in a dictionary.
Required credentials: username, password. Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to "BusinessCore".
verify (bool, optional): Whether or not verify certificates while connecting
to an API. Defaults to True.
"""
raw_creds = credentials or get_source_credentials(config_key)
validated_creds = dict(BusinessCoreCredentials(**raw_creds))

self.url = url
self.filters = self._clean_filters(filters)
self.verify = verify

super().__init__(*args, credentials=validated_creds, **kwargs)

def generate_token(self) -> str:
"""Generate a token for the user.
Returns:
string: The token.
"""
url = "https://api.businesscore.ae/api/user/Login"

username = self.credentials.get("username")
password = self.credentials.get("password").get_secret_value()
payload = f"grant_type=password&username={username}&password={password}&scope="
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = handle_api_response(
url=url,
headers=headers,
method="GET",
data=payload,
verify=self.verify,
)

return json.loads(response.text).get("access_token")

@staticmethod
def _clean_filters(filters: dict[str, str | None]) -> dict[str, str]:
"""Replace 'None' with '&' in a dictionary.
Required for payload in 'x-www-form-urlencoded' from.
Returns:
dict[str, str]: Dictionary with filters prepared for further use.
"""
return {key: ("&" if val is None else val) for key, val in filters.items()}

def get_data(self) -> dict[str, Any]:
"""Obtain data from Business Core API.
Returns:
dict: Dictionary with data downloaded from Business Core API.
"""
view = self.url.split("/")[-1]

if view not in [
"GetCustomerData",
"GetItemMaster",
"GetPendingSalesOrderData",
"GetSalesInvoiceData",
"GetSalesReturnDetailData",
"GetSalesOrderData",
"GetSalesQuotationData",
]:
error_message = f"View {view} currently not available."
raise APIError(error_message)

payload = (
"BucketCount="
+ str(self.filters.get("BucketCount"))
+ "BucketNo="
+ str(self.filters.get("BucketNo"))
+ "FromDate="
+ str(self.filters.get("FromDate"))
+ "ToDate"
+ str(self.filters.get("ToDate"))
)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Bearer " + self.generate_token(),
}
self.logger.info("Downloading the data...")
response = handle_api_response(
url=self.url,
headers=headers,
method="GET",
data=payload,
verify=self.verify,
)
self.logger.info("Data was downloaded successfully.")
return json.loads(response.text).get("MasterDataList")

@add_viadot_metadata_columns
def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFrame:
"""Download data into a pandas DataFrame.
Args:
if_empty (Literal["warn", "fail", "skip"], optional): What to do if output
DataFrame is empty. Defaults to "skip".
Returns:
pd.DataFrame: DataFrame with the data.
Raises:
APIError: When selected API view is not available.
"""
data = self.get_data()
df = pd.DataFrame.from_dict(data)
self.logger.info(
f"Data was successfully transformed into DataFrame: {len(df.columns)} columns and {len(df)} rows."
)
if df.empty:
self._handle_if_empty(if_empty)

return df
4 changes: 2 additions & 2 deletions src/viadot/sources/cloud_for_customers.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def get_response(
return handle_api_response(
url=url,
params=filter_params,
auth=(username, password),
auth=(username, password.get_secret_value()),
timeout=timeout,
)

Expand Down Expand Up @@ -318,7 +318,7 @@ def to_df(
tests: dict[str, Any] = kwargs.get("tests", {})

url = url or self.url
records = self.extract_records(url=url)
records = self.extract_records(url=url, report_url=self.report_url)
df = pd.DataFrame(data=records, **kwargs)

if dtype:
Expand Down
Loading

0 comments on commit c3589a1

Please sign in to comment.