diff --git a/.gitignore b/.gitignore index 7c0f1ee72..8e25bef26 100644 --- a/.gitignore +++ b/.gitignore @@ -159,4 +159,4 @@ sap_netweaver_rfc # Databricks-connect .databricks-connect -.dotnet \ No newline at end of file +.dotnet diff --git a/CHANGELOG.md b/CHANGELOG.md index fe6a083b3..20efb4496 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + ## [Unreleased] ### Added @@ -10,6 +11,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed + +## [0.4.20] - 2023-10-12 +### Added +- Added `Office365-REST-Python-Client` library to `requirements`. +- Added `GetSalesQuotationData` view in `BusinessCore` source. +- Added new ViewType `queue_interaction_detail_view` to Genesys. +- Added new column `_viadot_source` to BigQuery extraction. + +### Changed +- Changed the flow name from `TransformAndCatalog` to `TransformAndCatalogToLuma`. +- Modified `add_viadot_metadata_columns` to be able to apply a parameter source_name to the decorator for `to_df` function or function where the DataFrame is generated. +- Changed `SharepointToDF` task in order to implement `add_viadot_metadata_columns` with value `source_name="Sharepoint"` after changes. +- Changed `Mindful` credentials passed by the `auth` parameter, instead of the `header`. + + ## [0.4.19] - 2023-08-31 ### Added - Added `add_viadot_metadata_columns` function that will be used as a decorator for `to_df` class methods. diff --git a/requirements.txt b/requirements.txt index b7574b371..768887e4a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -41,4 +41,5 @@ avro-python3==1.10.2 pygit2>=1.10.1, <1.11.0 dbt-core==1.3.2 dbt-sqlserver==1.3.1 -lumaCLI==0.0.18 \ No newline at end of file +lumaCLI==0.0.19 +Office365-REST-Python-Client==2.4.4 diff --git a/tests/integration/flows/test_prefect_logs.py b/tests/integration/flows/test_prefect_logs.py index 60e3f799b..82cf3e0cd 100644 --- a/tests/integration/flows/test_prefect_logs.py +++ b/tests/integration/flows/test_prefect_logs.py @@ -8,18 +8,13 @@ def expectation_suite(): expectation_suite = { "data": { "project": [ - { - "id": "6f413380-e228-4d64-8e1b-41c6cd434a2a", - "name": "Installer Engagement", - "flows": [], - }, { "id": "223a8acf-4cf0-4cf7-ae1f-b66f78e28813", - "name": "oso_reporting", + "name": "Admin", "flows": [ { "id": "b13dcc6d-b621-4acd-88be-2cf28715a7c5", - "name": "1-raw dakvenster_order_prod_info extract", + "name": "1-raw table_catalog extract", "version": 3, "flow_runs": [ { @@ -34,23 +29,18 @@ def expectation_suite(): }, { "id": "14b1a89e-f902-48a1-b6df-43cacdb91e1a", - "name": "1-raw dakvenster_order_prod_info extract", + "name": "1-raw table_catalog extract", "version": 2, "flow_runs": [], }, { "id": "a1eace09-38b4-46bf-bacf-a5d29bdbb633", - "name": "1-raw dakvenster_order_prod_info extract", + "name": "1-raw table_catalog extract", "version": 1, "flow_runs": [], }, ], }, - { - "id": "844372db-2d22-495d-a343-b8f8cbcf8963", - "name": "sap", - "flows": [], - }, { "id": "512d0f29-2ceb-4177-b7d8-c5908da666ef", "name": "integrations", @@ -61,16 +51,6 @@ def expectation_suite(): "name": "dev_cdl", "flows": [], }, - { - "id": "e2a926e2-ec86-4900-a24e-330a44b6cb19", - "name": "cic_test", - "flows": [], - }, - { - "id": "667d5026-2f01-452a-b6fe-5437ca833066", - "name": "cic_dev", - "flows": [], - }, { "id": "eac9b6d4-725a-4354-bf8f-25e7828ea2d8", "name": "Admin", @@ -86,31 +66,11 @@ def expectation_suite(): "name": "cic", "flows": [], }, - { - "id": "dd2ccc32-2163-4f55-a746-1dbc6b28aaa4", - "name": "Hyperlocal", - "flows": [], - }, - { - "id": "7131c357-bad7-43cf-aabc-87f9cf045384", - "name": "Installer Segmentation", - "flows": [], - }, - { - "id": "94a8b8bf-14fa-4b64-ab78-af1d332dedd4", - "name": "Marketing KPI", - "flows": [], - }, { "id": "ebe0e5aa-4add-4440-8c1a-6f9c74eb29fe", "name": "dev", "flows": [], }, - { - "id": "b5d924b0-4116-479f-a8f5-e28f9a9051ca", - "name": "velux", - "flows": [], - }, ] } } @@ -127,7 +87,7 @@ def test_prefect_logs(expectation_suite): id name flows ( - where : {name: {_eq: "1-raw google_analytics_oso_sps_gb extract"}} + where : {name: {_eq: "1-raw table_catalog extract"}} ) { id name @@ -156,7 +116,7 @@ def test_prefect_logs(expectation_suite): scheduled_start_time="2022-09-05", filter_type="_gte", local_file_path=f"prefect_extract_logs.parquet", - adls_path=f"raw/supermetrics/mp/prefect_extract_logs.parquet", + adls_path=f"raw/tests/prefect_extract_logs.parquet", ) results = flow.run() diff --git a/tests/integration/tasks/test_bigquery.py b/tests/integration/tasks/test_bigquery.py index 3cf184d85..304c31f34 100644 --- a/tests/integration/tasks/test_bigquery.py +++ b/tests/integration/tasks/test_bigquery.py @@ -17,7 +17,7 @@ def test_bigquery_to_df_success(): credentials_key=CREDENTIALS_KEY, ) df = bigquery_to_df_task.run() - expected_column = ["my_value"] + expected_column = ["my_value", "_viadot_source"] assert isinstance(df, pd.DataFrame) assert expected_column == list(df.columns) diff --git a/tests/integration/test_mindful.py b/tests/integration/test_mindful.py index c7088913c..1718a9db9 100644 --- a/tests/integration/test_mindful.py +++ b/tests/integration/test_mindful.py @@ -10,9 +10,10 @@ os.system("clear") credentials_mindful = local_config["MINDFUL"] -header = { - "Authorization": f"Bearer {credentials_mindful.get('VAULT')}", -} +auth = ( + credentials_mindful["CUSTOMER_UUID"], + credentials_mindful["AUTH_TOKEN"], +) class MockClass: @@ -42,14 +43,14 @@ def json(): @pytest.mark.init def test_instance_mindful(): - mf = Mindful(header=header) + mf = Mindful(auth=auth) assert isinstance(mf, Mindful) @mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass) @pytest.mark.connect def test_mindful_api_response(mock_connection): - mf = Mindful(header=header) + mf = Mindful(auth=auth) mf.get_interactions_list() mf.get_responses_list() mock_connection.call_count == 2 @@ -58,7 +59,7 @@ def test_mindful_api_response(mock_connection): @mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass) @pytest.mark.connect def test_mindful_api_response2(mock_api_response): - mf = Mindful(header=header) + mf = Mindful(auth=auth) response = mf.get_interactions_list() @@ -69,7 +70,7 @@ def test_mindful_api_response2(mock_api_response): @mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass) @pytest.mark.connect def test_mindful_api_response3(mock_api_response): - mf = Mindful(header=header) + mf = Mindful(auth=auth) response = mf.get_responses_list() @@ -80,7 +81,7 @@ def test_mindful_api_response3(mock_api_response): @mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass) @pytest.mark.connect def test_mindful_api_response4(mock_api_response): - mf = Mindful(header=header) + mf = Mindful(auth=auth) response = mf.get_survey_list() @@ -91,7 +92,7 @@ def test_mindful_api_response4(mock_api_response): @mock.patch("viadot.sources.Mindful._mindful_api_response", return_value=MockClass) @pytest.mark.save def test_mindful_interactions(mock_connection): - mf = Mindful(header=header) + mf = Mindful(auth=auth) response = mf.get_interactions_list() mf.response_to_file(response) assert mf.endpoint == "interactions" and isinstance(mf.endpoint, str) @@ -103,7 +104,7 @@ def test_mindful_interactions(mock_connection): @mock.patch("viadot.sources.Mindful._mindful_api_response", return_value=MockClass) @pytest.mark.save def test_mindful_responses(mock_connection): - mf = Mindful(header=header) + mf = Mindful(auth=auth) response = mf.get_responses_list() mf.response_to_file(response) @@ -115,7 +116,7 @@ def test_mindful_responses(mock_connection): @mock.patch("viadot.sources.Mindful._mindful_api_response", return_value=MockClass) @pytest.mark.save def test_mindful_surveys(mock_connection): - mf = Mindful(header=header) + mf = Mindful(auth=auth) response = mf.get_survey_list() mf.response_to_file(response) diff --git a/tests/test_viadot.py b/tests/test_viadot.py index 3241757ff..bffda072a 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.4.19" + assert __version__ == "0.4.20" diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 7a1b84fe3..a94eaff9f 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -16,16 +16,22 @@ class ClassForDecorator: + source = "Source_name" + def __init__(self): self.df = pd.DataFrame({"a": [123], "b": ["abc"]}) def to_df(self): return self.df - @add_viadot_metadata_columns + @add_viadot_metadata_columns() def to_df_decorated(self): return self.df + @add_viadot_metadata_columns(source) + def to_df_decorated_parameter(self): + return self.df + def test_single_quotes_inside(): TEST_VALUE = "a'b" @@ -138,3 +144,12 @@ def test_add_viadot_metadata_columns_base(): assert df_base.columns.to_list() == ["a", "b"] assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"] assert df_decorated["_viadot_source"][0] == "ClassForDecorator" + + +def test_add_viadot_metadata_columns_with_parameter(): + df_base = ClassForDecorator().to_df() + df_decorated = ClassForDecorator().to_df_decorated_parameter() + + assert df_base.columns.to_list() == ["a", "b"] + assert df_decorated.columns.to_list() == ["a", "b", "_viadot_source"] + assert df_decorated["_viadot_source"][0] == "Source_name" diff --git a/viadot/__init__.py b/viadot/__init__.py index 8f584e622..b4ed79e09 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.4.19" +__version__ = "0.4.20" diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index c5e1736c7..de2f618ab 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -45,5 +45,5 @@ from .sql_server_to_duckdb import SQLServerToDuckDB from .sql_server_to_parquet import SQLServerToParquet from .sql_server_transform import SQLServerTransform -from .transform_and_catalog import TransformAndCatalog +from .transform_and_catalog import TransformAndCatalogToLuma from .vid_club_to_adls import VidClubToADLS diff --git a/viadot/flows/transform_and_catalog.py b/viadot/flows/transform_and_catalog.py index 11dc12cc4..1de5c4430 100644 --- a/viadot/flows/transform_and_catalog.py +++ b/viadot/flows/transform_and_catalog.py @@ -5,11 +5,12 @@ from prefect import Flow, task from prefect.tasks.shell import ShellTask +from prefect.triggers import any_successful from viadot.tasks import CloneRepo, AzureKeyVaultSecret, LumaIngest -@task +@task(trigger=any_successful) def _cleanup_repo(dbt_repo_dir_name: str) -> None: """ Remove a repo folder. @@ -20,13 +21,32 @@ def _cleanup_repo(dbt_repo_dir_name: str) -> None: shutil.rmtree(dbt_repo_dir_name, ignore_errors=True) # Delete folder on run -class TransformAndCatalog(Flow): +@task(trigger=any_successful) +def custom_shell_task(name: str, command: str, helper_script: str = None) -> None: + """ + Task created to run ShellTask and apply `trigger` on it. For regular ShellTask it is not possible. + + Args: + name (str): The name of the flow. + command (str): Shell command to run. + helper_script (str, optional): Path to local path repo. Defaults to None. + """ + ShellTask( + name=name, + command=command, + helper_script=helper_script, + return_all=True, + stream_output=True, + ).run() + + +class TransformAndCatalogToLuma(Flow): """Build specified dbt model(s) and upload the generated metadata to Luma catalog.""" def __init__( self, name: str, - dbt_project_path: str, + dbt_project_path: str = "tmp_dbt_repo_dir", dbt_repo_url: str = None, dbt_repo_url_secret: str = None, dbt_repo_branch: str = None, @@ -35,7 +55,6 @@ def __init__( local_dbt_repo_path: str = None, dbt_selects: Dict[str, str] = None, dbt_target: str = None, - stateful: bool = False, metadata_dir_path: Union[str, Path] = None, luma_url: str = "http://localhost", luma_url_secret: str = None, @@ -48,8 +67,8 @@ def __init__( Args: name (str): The name of the Flow. - dbt_project_path (str): The path to the dbt project (the directory containing - the `dbt_project.yml` file). + dbt_project_path (str, optional): The path to the dbt project (the directory containing + the `dbt_project.yml` file). Defaults to 'tmp_dbt_repo_dir'. dbt_repo_url (str, optional): The URL for cloning the dbt repo with relevant dbt project. Defaults to None. dbt_repo_url_secret (str, optional): Alternatively to above, the secret containing `dbt_repo_url`. Defaults to None. @@ -62,8 +81,6 @@ def __init__( from run's, as long as run select is provided. Defaults to None. dbt_target (str): The dbt target to use. If not specified, the default dbt target (as specified in `profiles.yaml`) will be used. Defaults to None. - stateful (bool, optional): Whether only the models should be rebuilt only if modified. - See [dbt docs](https://docs.getdbt.com/guides/legacy/understanding-state). Defaults to False. metadata_dir_path (Union[str, Path]): The path to the directory containing metadata files. In the case of dbt, it's dbt project's `target` directory, which contains dbt artifacts (`sources.json`, `catalog.json`, `manifest.json`, and `run_results.json`). Defaults to None. @@ -78,19 +95,20 @@ def __init__( # Build a single model ```python import os - from viadot.flows import TransformAndCatalog + from viadot.flows import TransformAndCatalogToLuma my_dbt_project_path = os.path.expanduser("~/dbt/my_dbt_project") - flow = TransformAndCatalog( - name="Transform and Catalog", + flow = TransformAndCatalogToLuma( + name="Transform and Catalog to Luma", dbt_project_path=my_dbt_project_path, dbt_repo_url=my_dbt_repo_url, token=my_token, - dbt_selects={"run": "my_model", - "source_freshness": "source:schema.table", - "test": "my_model"}, - metadata_dir_path="target", + dbt_selects={ + "run": "my_model", + "source_freshness": "source:schema.table", + "test": "my_model"}, + metadata_dir_path=f"{my_dbt_project_path}/target", luma_url="http://localhost" ) flow.run() @@ -106,8 +124,6 @@ def __init__( self.dbt_target = dbt_target self.dbt_selects = dbt_selects - self.stateful = stateful - # CloneRepo self.dbt_repo_url = dbt_repo_url self.dbt_repo_url_secret = dbt_repo_url_secret @@ -137,7 +153,7 @@ def gen_flow(self) -> Flow: local_dbt_repo_path = ( os.path.expandvars(self.local_dbt_repo_path) if self.local_dbt_repo_path is not None - else "tmp_dbt_repo_dir" + else f"{self.dbt_project_path}" ) clone_repo = CloneRepo(url=dbt_repo_url) @@ -159,7 +175,7 @@ def gen_flow(self) -> Flow: dbt_clean_up = ShellTask( name="dbt_task_clean", command=f"dbt clean", - helper_script=f"cd {self.dbt_project_path}", + helper_script=f"cd {local_dbt_repo_path}", return_all=True, stream_output=True, ).bind(flow=self) @@ -167,7 +183,7 @@ def gen_flow(self) -> Flow: pull_dbt_deps = ShellTask( name="dbt_task_deps", command=f"dbt deps", - helper_script=f"cd {self.dbt_project_path}", + helper_script=f"cd {local_dbt_repo_path}", return_all=True, stream_output=True, ).bind(flow=self) @@ -178,7 +194,7 @@ def gen_flow(self) -> Flow: run = ShellTask( name="dbt_task_run", command=f"dbt run {run_select_safe} {dbt_target_option}", - helper_script=f"cd {self.dbt_project_path}", + helper_script=f"cd {local_dbt_repo_path}", return_all=True, stream_output=True, ).bind(flow=self) @@ -189,20 +205,20 @@ def gen_flow(self) -> Flow: test = ShellTask( name="dbt_task_test", command=f"dbt test {test_select_safe} {dbt_target_option}", - helper_script=f"cd {self.dbt_project_path}", + helper_script=f"cd {local_dbt_repo_path}", return_all=True, stream_output=True, ).bind(flow=self) # Generate docs # Produces `catalog.json`, `run-results.json`, and `manifest.json` - generate_catalog_json = ShellTask( + + generate_catalog_json = custom_shell_task.bind( name="dbt_task_docs_generate", command=f"dbt docs generate {dbt_target_option} --no-compile", helper_script=f"cd {self.dbt_project_path}", - return_all=True, - stream_output=True, - ).bind(flow=self) + flow=self, + ) # Upload build metadata to Luma path_expanded = os.path.expandvars(self.metadata_dir_path) diff --git a/viadot/sources/bigquery.py b/viadot/sources/bigquery.py index 910865bc1..1be69e866 100644 --- a/viadot/sources/bigquery.py +++ b/viadot/sources/bigquery.py @@ -7,6 +7,7 @@ from ..config import local_config from ..exceptions import CredentialError, DBDataAccessError from .base import Source +from ..utils import add_viadot_metadata_columns class BigQuery(Source): @@ -100,9 +101,11 @@ def get_project_id(self) -> str: """ return self.credentials["project_id"] + @add_viadot_metadata_columns() def query_to_df(self, query: str) -> pd.DataFrame: """ Query throught Bigquery table. + `add_viadot_metadata_columns` adds one column to the DF - `_viadot_source`. Args: query (str): SQL-Like Query to return data values. diff --git a/viadot/sources/business_core.py b/viadot/sources/business_core.py index 3495cecd9..bd02b00f0 100644 --- a/viadot/sources/business_core.py +++ b/viadot/sources/business_core.py @@ -150,6 +150,7 @@ def to_df(self, if_empty: Literal["warn", "fail", "skip"] = "skip") -> pd.DataFr "GetSalesInvoiceData", "GetSalesReturnDetailData", "GetSalesOrderData", + "GetSalesQuotationData", ]: raise APIError(f"View {view} currently not available.") diff --git a/viadot/sources/genesys.py b/viadot/sources/genesys.py index 4556ededf..6be907a66 100644 --- a/viadot/sources/genesys.py +++ b/viadot/sources/genesys.py @@ -320,6 +320,7 @@ def download_all_reporting_exports( date = self.start_date.replace("-", "") if single_report[4].lower() in [ "queue_performance_detail_view", + "queue_interaction_detail_view", "agent_status_detail_view", ]: file_name = f"{self.view_type.upper()}_{next(self.count)}_{date}" diff --git a/viadot/sources/mindful.py b/viadot/sources/mindful.py index f1334e5b8..254eecb9d 100644 --- a/viadot/sources/mindful.py +++ b/viadot/sources/mindful.py @@ -1,11 +1,12 @@ import os -from datetime import datetime, timedelta from io import StringIO -from typing import Any, Dict, Literal +from datetime import datetime, timedelta +from typing import Any, Dict, Literal, Tuple import pandas as pd import prefect from requests.models import Response +from requests.auth import HTTPBasicAuth from viadot.exceptions import APIError from viadot.sources.base import Source @@ -15,7 +16,7 @@ class Mindful(Source): def __init__( self, - header: str, + auth: Tuple[str], region: Literal["us1", "us2", "us3", "ca1", "eu1", "au1"] = "eu1", start_date: datetime = None, end_date: datetime = None, @@ -27,7 +28,7 @@ def __init__( """Mindful connector which allows listing and downloading into Data Frame or specified format output. Args: - header (str): Header with credentials for calling Mindful API. + auth (Tuple[str]): Authentication credentials for calling Mindful API. The structure is user and password. region (Literal[us1, us2, us3, ca1, eu1, au1], optional): SD region from where to interact with the mindful API. Defaults to "eu1". start_date (datetime, optional): Start date of the request. Defaults to None. end_date (datetime, optional): End date of the resquest. Defaults to None. @@ -73,7 +74,7 @@ def __init__( ) self.file_extension = file_extension - self.header = header + self.auth = auth def _mindful_api_response( self, @@ -94,8 +95,8 @@ def _mindful_api_response( response = handle_api_response( url=f"https://{self.region}surveydynamix.com/api/{endpoint}", params=params, - headers=self.header, method="GET", + auth=HTTPBasicAuth(*self.auth), ) return response diff --git a/viadot/tasks/cloud_for_customers.py b/viadot/tasks/cloud_for_customers.py index 09dc399b9..6e1ca6b9a 100644 --- a/viadot/tasks/cloud_for_customers.py +++ b/viadot/tasks/cloud_for_customers.py @@ -159,9 +159,9 @@ def run( vault_name: str = None, ): """ - Task for downloading data from the Cloud for Customers to a pandas DataFrame using normal URL (with query parameters). - This task grab data from table from 'scratch' with passing table name in url or endpoint. It is rocommended to add - some filters parameters in this case. + Task for downloading data from the Cloud for Customers to a pandas DataFrame using URL (with query parameters). + Data is obtained from table by passing table name in the url or endpoint. It is recommended to add filters + parameters in this case. Example: url = "https://mysource.com/sap/c4c/odata/v1/c4codataapi" @@ -169,12 +169,12 @@ def run( params = {"$filter": "CreationDateTime ge 2021-12-21T00:00:00Z"} Args: - url (str, optional): The url to the API in case of prepared report. Defaults to None. - env (str, optional): The environment to use. Defaults to 'QA'. + url (str, optional): The url to the API used in case of prepared report. Defaults to None. + env (str, optional): The environment to use to obtain credentials. Defaults to 'QA'. endpoint (str, optional): The endpoint of the API. Defaults to None. fields (List[str], optional): The C4C Table fields. Defaults to None. - params (Dict[str, str]): Query parameters. Defaults to $format=json. - chunksize (int, optional): How many rows to retrieve from C4C at a time. Uses a server-side cursor. + params (Dict[str, str]): Query parameters. Defaults to None. + chunksize (int, optional): How many rows to retrieve from C4C at a time. Uses a server-side cursor. Defaults to None. if_empty (str, optional): What to do if query returns no data. Defaults to "warn". credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with C4C credentials. Defaults to None. @@ -211,7 +211,9 @@ def run( def _generate_chunks() -> Generator[pd.DataFrame, None, None]: """ - Util returning chunks as a generator to save memory. + Util function returning chunks. + + Returns: Generator[pd.DataFrame, None, None] """ offset = 0 total_record_count = 0 diff --git a/viadot/tasks/luma.py b/viadot/tasks/luma.py index 9b9ad9095..5b78ebc27 100644 --- a/viadot/tasks/luma.py +++ b/viadot/tasks/luma.py @@ -1,7 +1,5 @@ import json - from prefect.tasks.shell import ShellTask - from .azure_key_vault import AzureKeyVaultSecret @@ -41,9 +39,7 @@ def __init__( self.helper_script = dbt_project_path self.url = url self.metadata_dir_path = metadata_dir_path - self.command = ( - f"luma dbt ingest --luma-url {url} --metadata-dir {metadata_dir_path} " - ) + self.command = f"luma dbt send-test-results --luma-url {url} --metadata-dir {metadata_dir_path}" self.return_all = True self.stream_output = True self.log_stderr = True diff --git a/viadot/tasks/mindful.py b/viadot/tasks/mindful.py index fcfbba993..54d195009 100644 --- a/viadot/tasks/mindful.py +++ b/viadot/tasks/mindful.py @@ -117,12 +117,13 @@ def run( credentials_mindful = None raise CredentialError("Credentials not found.") - header = { - "Authorization": f"Bearer {credentials_mindful.get('VAULT')}", - } + auth = ( + credentials_mindful["CUSTOMER_UUID"], + credentials_mindful["AUTH_TOKEN"], + ) mindful = Mindful( - header=header, + auth=auth, region=region, start_date=start_date, end_date=end_date, diff --git a/viadot/tasks/sharepoint.py b/viadot/tasks/sharepoint.py index fcceb1dbf..7ba9c4d41 100644 --- a/viadot/tasks/sharepoint.py +++ b/viadot/tasks/sharepoint.py @@ -12,6 +12,7 @@ from ..exceptions import ValidationError from ..sources import Sharepoint from .azure_key_vault import AzureKeyVaultSecret +from ..utils import add_viadot_metadata_columns logger = logging.get_logger() @@ -147,6 +148,7 @@ def split_sheet( "sheet_number", "validate_excel_file", ) + @add_viadot_metadata_columns(source_name="Sharepoint") def run( self, path_to_file: str = None, diff --git a/viadot/utils.py b/viadot/utils.py index 14659c702..2b4c80538 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -408,21 +408,44 @@ def check_if_empty_file( handle_if_empty_file(if_empty, message=f"Input file - '{path}' is empty.") -def add_viadot_metadata_columns(func: Callable) -> Callable: +def add_viadot_metadata_columns(source_name: str = None) -> Callable: """ Decorator that adds metadata columns to df in 'to_df' method. For now only _viadot_source is available because _viadot_downloaded_at_utc is added on the Flow level. + + Args: + source_name (str, optional): The name of the source to be included in the DataFrame. + This should be provided when creating a DataFrame in a Task, rather than in a Source. + Defaults to None. + + Warning: Please remember to include brackets when applying a decorator, even if you are not passing the 'source_name' parameter. + + Example: + + In task: + + @add_viadot_metadata_columns(source_name="Sharepoint") + def to_df(self): + ... + + In source: + + @add_viadot_metadata_columns() + def to_df(self): + ... """ - @functools.wraps(func) - def wrapper(*args, **kwargs) -> pd.DataFrame: - df = func(*args, **kwargs) + def decorator(func) -> Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs) -> pd.DataFrame: + df = func(*args, **kwargs) + + df["_viadot_source"] = ( + source_name if source_name is not None else args[0].__class__.__name__ + ) + + return df - # Accessing instance - instance = args[0] - _viadot_source = kwargs.get("source_name") or instance.__class__.__name__ - df["_viadot_source"] = _viadot_source - # df["_viadot_downloaded_at_utc"] = datetime.now(timezone.utc).replace(microsecond=0) - return df + return wrapper - return wrapper + return decorator