From 6dce986240da9c21f16281511e3382fd74bfe3e5 Mon Sep 17 00:00:00 2001 From: jkrobicki Date: Wed, 25 Oct 2023 16:11:59 +0200 Subject: [PATCH 1/6] =?UTF-8?q?=E2=9C=A8=20Added=20df=20validation=20task?= =?UTF-8?q?=20to=20the=20flow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/flows/outlook_to_adls.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/viadot/flows/outlook_to_adls.py b/viadot/flows/outlook_to_adls.py index 3f24a65d8..c736cd236 100644 --- a/viadot/flows/outlook_to_adls.py +++ b/viadot/flows/outlook_to_adls.py @@ -9,6 +9,7 @@ df_to_csv, df_to_parquet, union_dfs_task, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, OutlookToDF @@ -29,6 +30,7 @@ def __init__( limit: int = 10000, timeout: int = 3600, if_exists: Literal["append", "replace", "skip"] = "append", + validation_df_dict: dict = None, outlook_credentials_secret: str = "OUTLOOK", *args: List[Any], **kwargs: Dict[str, Any], @@ -54,6 +56,8 @@ def __init__( timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. if_exists (Literal['append', 'replace', 'skip'], optional): What to do if the local file already exists. Defaults to "append". + validation_df_dict (dict, optional): An optional dictionary to verify the received dataframe. + When passed, `validate_df` task validation tests are triggered. Defaults to None. """ self.mailbox_list = mailbox_list @@ -65,6 +69,9 @@ def __init__( self.local_file_path = local_file_path self.if_exsists = if_exists + # Validate DataFrame + self.validation_df_dict = validation_df_dict + # AzureDataLakeUpload self.adls_file_path = adls_file_path self.output_file_extension = output_file_extension @@ -98,6 +105,11 @@ def gen_flow(self) -> Flow: dfs = apply_map(self.gen_outlook_df, self.mailbox_list, flow=self) df = union_dfs_task.bind(dfs, flow=self) + + if self.validation_df_dict: + validation = validate_df(df=df, tests=self.validation_df_dict, flow=self) + validation.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) if self.output_file_extension == ".parquet": From b7f9f5673d9c31e6ac47a24732801df2994ab842 Mon Sep 17 00:00:00 2001 From: jkrobicki Date: Thu, 26 Oct 2023 09:28:52 +0200 Subject: [PATCH 2/6] =?UTF-8?q?=E2=9C=85=20Added=20tests=20for=20outlook?= =?UTF-8?q?=5Fto=5Fadls=20flow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../integration/flows/test_outlook_to_adls.py | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 tests/integration/flows/test_outlook_to_adls.py diff --git a/tests/integration/flows/test_outlook_to_adls.py b/tests/integration/flows/test_outlook_to_adls.py new file mode 100644 index 000000000..8250235cb --- /dev/null +++ b/tests/integration/flows/test_outlook_to_adls.py @@ -0,0 +1,91 @@ +import os +from unittest import mock +from datetime import date, timedelta +from prefect.tasks.secrets import PrefectSecret + +import pandas as pd +import pytest + +from viadot.flows import OutlookToADLS + + +ADLS_FILE_NAME = "test_outlook_to_adls.parquet" +ADLS_DIR_PATH = "raw/tests/" + +start_date = date.today() - timedelta(days=1) +start_date = start_date.strftime("%Y-%m-%d") +end_date = date.today().strftime("%Y-%m-%d") + +mailbox_list = [ + "romania.tehnic@velux.com", +] + +DATA = { + "sender": ["sender@mail.com"], + "receivers": ["receiver@mail.com"], +} + + +def test_outlook_to_adls_flow_run(): + flow = OutlookToADLS( + name="Test OutlookToADLS flow run", + mailbox_list=mailbox_list, + outbox_list=["Outbox", "Sent Items"], + start_date=start_date, + end_date=end_date, + local_file_path=ADLS_FILE_NAME, + adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, + adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + if_exists="replace", + timeout=4400, + ) + + result = flow.run() + assert result.is_successful() + + +def test_outlook_to_adls_run_flow_validate_fail(): + flow = OutlookToADLS( + name="Test OutlookToADLS validate flow df fail", + mailbox_list=mailbox_list, + outbox_list=["Outbox", "Sent Items"], + start_date=start_date, + end_date=end_date, + local_file_path=ADLS_FILE_NAME, + adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, + adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + if_exists="replace", + validation_df_dict={"column_list_to_match": ["test", "wrong", "columns"]}, + timeout=4400, + ) + + result = flow.run() + assert result.is_failed() + + +@mock.patch( + "viadot.tasks.OutlookToDF.run", + return_value=pd.DataFrame(data=DATA), +) +@pytest.mark.run +def test_outlook_to_adls_run_flow_validate_success(mocked_task): + flow = OutlookToADLS( + name="Test OutlookToADLS validate flow df success", + mailbox_list=mailbox_list, + outbox_list=["Outbox", "Sent Items"], + start_date=start_date, + end_date=end_date, + local_file_path=ADLS_FILE_NAME, + adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, + adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + if_exists="replace", + validation_df_dict={"column_list_to_match": ["sender", "receivers"]}, + timeout=4400, + ) + + result = flow.run() + assert result.is_successful() + + os.remove("test_outlook_to_adls.parquet") + os.remove("romania_tehnic.csv") + os.remove("o365_token.txt") From bed6995d110233658372c19228f199a4213fb4a6 Mon Sep 17 00:00:00 2001 From: jkrobicki Date: Thu, 26 Oct 2023 09:43:35 +0200 Subject: [PATCH 3/6] Cleaned imports and secrets --- tests/integration/flows/test_outlook_to_adls.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/integration/flows/test_outlook_to_adls.py b/tests/integration/flows/test_outlook_to_adls.py index 8250235cb..ad88ce5da 100644 --- a/tests/integration/flows/test_outlook_to_adls.py +++ b/tests/integration/flows/test_outlook_to_adls.py @@ -7,8 +7,11 @@ import pytest from viadot.flows import OutlookToADLS +from viadot.tasks import AzureDataLakeRemove - +ADLS_CREDENTIAL_SECRET = PrefectSecret( + "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" +).run() ADLS_FILE_NAME = "test_outlook_to_adls.parquet" ADLS_DIR_PATH = "raw/tests/" @@ -35,7 +38,7 @@ def test_outlook_to_adls_flow_run(): end_date=end_date, local_file_path=ADLS_FILE_NAME, adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, - adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, if_exists="replace", timeout=4400, ) @@ -53,7 +56,7 @@ def test_outlook_to_adls_run_flow_validate_fail(): end_date=end_date, local_file_path=ADLS_FILE_NAME, adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, - adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, if_exists="replace", validation_df_dict={"column_list_to_match": ["test", "wrong", "columns"]}, timeout=4400, @@ -77,7 +80,7 @@ def test_outlook_to_adls_run_flow_validate_success(mocked_task): end_date=end_date, local_file_path=ADLS_FILE_NAME, adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, - adls_sp_credentials_secret="App-Azure-CR-DatalakeGen2-AIA", + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, if_exists="replace", validation_df_dict={"column_list_to_match": ["sender", "receivers"]}, timeout=4400, @@ -89,3 +92,7 @@ def test_outlook_to_adls_run_flow_validate_success(mocked_task): os.remove("test_outlook_to_adls.parquet") os.remove("romania_tehnic.csv") os.remove("o365_token.txt") + rm = AzureDataLakeRemove( + path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s" + ) + rm.run(sp_credentials_secret=ADLS_CREDENTIAL_SECRET) From 5a5b153445d485c63fb064f6306c786bdf67d71c Mon Sep 17 00:00:00 2001 From: jkrobicki Date: Thu, 26 Oct 2023 10:16:34 +0200 Subject: [PATCH 4/6] Corrected parameter to validate_df_dict --- tests/integration/flows/test_outlook_to_adls.py | 4 ++-- viadot/flows/outlook_to_adls.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/flows/test_outlook_to_adls.py b/tests/integration/flows/test_outlook_to_adls.py index ad88ce5da..dc215191d 100644 --- a/tests/integration/flows/test_outlook_to_adls.py +++ b/tests/integration/flows/test_outlook_to_adls.py @@ -58,7 +58,7 @@ def test_outlook_to_adls_run_flow_validate_fail(): adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, if_exists="replace", - validation_df_dict={"column_list_to_match": ["test", "wrong", "columns"]}, + validate_df_dict={"column_list_to_match": ["test", "wrong", "columns"]}, timeout=4400, ) @@ -82,7 +82,7 @@ def test_outlook_to_adls_run_flow_validate_success(mocked_task): adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, if_exists="replace", - validation_df_dict={"column_list_to_match": ["sender", "receivers"]}, + validate_df_dict={"column_list_to_match": ["sender", "receivers"]}, timeout=4400, ) diff --git a/viadot/flows/outlook_to_adls.py b/viadot/flows/outlook_to_adls.py index c736cd236..28575640c 100644 --- a/viadot/flows/outlook_to_adls.py +++ b/viadot/flows/outlook_to_adls.py @@ -30,7 +30,7 @@ def __init__( limit: int = 10000, timeout: int = 3600, if_exists: Literal["append", "replace", "skip"] = "append", - validation_df_dict: dict = None, + validate_df_dict: dict = None, outlook_credentials_secret: str = "OUTLOOK", *args: List[Any], **kwargs: Dict[str, Any], @@ -56,7 +56,7 @@ def __init__( timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. if_exists (Literal['append', 'replace', 'skip'], optional): What to do if the local file already exists. Defaults to "append". - validation_df_dict (dict, optional): An optional dictionary to verify the received dataframe. + validate_df_dict (dict, optional): An optional dictionary to verify the received dataframe. When passed, `validate_df` task validation tests are triggered. Defaults to None. """ @@ -70,7 +70,7 @@ def __init__( self.if_exsists = if_exists # Validate DataFrame - self.validation_df_dict = validation_df_dict + self.validate_df_dict = validate_df_dict # AzureDataLakeUpload self.adls_file_path = adls_file_path @@ -106,8 +106,8 @@ def gen_flow(self) -> Flow: df = union_dfs_task.bind(dfs, flow=self) - if self.validation_df_dict: - validation = validate_df(df=df, tests=self.validation_df_dict, flow=self) + if self.validate_df_dict: + validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) validation.set_upstream(df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) From 6e396c4adccae2df793adf6715caaacd584fa56c Mon Sep 17 00:00:00 2001 From: jkrobicki Date: Thu, 26 Oct 2023 10:33:25 +0200 Subject: [PATCH 5/6] Corrected validation if statement --- viadot/flows/outlook_to_adls.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/viadot/flows/outlook_to_adls.py b/viadot/flows/outlook_to_adls.py index 28575640c..606674ba1 100644 --- a/viadot/flows/outlook_to_adls.py +++ b/viadot/flows/outlook_to_adls.py @@ -107,8 +107,10 @@ def gen_flow(self) -> Flow: df = union_dfs_task.bind(dfs, flow=self) if self.validate_df_dict: - validation = validate_df(df=df, tests=self.validate_df_dict, flow=self) - validation.set_upstream(df, flow=self) + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) From 34e40cb6987849210f3976e3d3d735e72fe15fd1 Mon Sep 17 00:00:00 2001 From: Mateusz Pazdzior <59165045+m-paz@users.noreply.github.com> Date: Thu, 26 Oct 2023 11:07:45 +0100 Subject: [PATCH 6/6] Delete tests/integration/flows/test_outlook_to_adls.py --- .../integration/flows/test_outlook_to_adls.py | 98 ------------------- 1 file changed, 98 deletions(-) delete mode 100644 tests/integration/flows/test_outlook_to_adls.py diff --git a/tests/integration/flows/test_outlook_to_adls.py b/tests/integration/flows/test_outlook_to_adls.py deleted file mode 100644 index dc215191d..000000000 --- a/tests/integration/flows/test_outlook_to_adls.py +++ /dev/null @@ -1,98 +0,0 @@ -import os -from unittest import mock -from datetime import date, timedelta -from prefect.tasks.secrets import PrefectSecret - -import pandas as pd -import pytest - -from viadot.flows import OutlookToADLS -from viadot.tasks import AzureDataLakeRemove - -ADLS_CREDENTIAL_SECRET = PrefectSecret( - "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" -).run() -ADLS_FILE_NAME = "test_outlook_to_adls.parquet" -ADLS_DIR_PATH = "raw/tests/" - -start_date = date.today() - timedelta(days=1) -start_date = start_date.strftime("%Y-%m-%d") -end_date = date.today().strftime("%Y-%m-%d") - -mailbox_list = [ - "romania.tehnic@velux.com", -] - -DATA = { - "sender": ["sender@mail.com"], - "receivers": ["receiver@mail.com"], -} - - -def test_outlook_to_adls_flow_run(): - flow = OutlookToADLS( - name="Test OutlookToADLS flow run", - mailbox_list=mailbox_list, - outbox_list=["Outbox", "Sent Items"], - start_date=start_date, - end_date=end_date, - local_file_path=ADLS_FILE_NAME, - adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, - adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, - if_exists="replace", - timeout=4400, - ) - - result = flow.run() - assert result.is_successful() - - -def test_outlook_to_adls_run_flow_validate_fail(): - flow = OutlookToADLS( - name="Test OutlookToADLS validate flow df fail", - mailbox_list=mailbox_list, - outbox_list=["Outbox", "Sent Items"], - start_date=start_date, - end_date=end_date, - local_file_path=ADLS_FILE_NAME, - adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, - adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, - if_exists="replace", - validate_df_dict={"column_list_to_match": ["test", "wrong", "columns"]}, - timeout=4400, - ) - - result = flow.run() - assert result.is_failed() - - -@mock.patch( - "viadot.tasks.OutlookToDF.run", - return_value=pd.DataFrame(data=DATA), -) -@pytest.mark.run -def test_outlook_to_adls_run_flow_validate_success(mocked_task): - flow = OutlookToADLS( - name="Test OutlookToADLS validate flow df success", - mailbox_list=mailbox_list, - outbox_list=["Outbox", "Sent Items"], - start_date=start_date, - end_date=end_date, - local_file_path=ADLS_FILE_NAME, - adls_file_path=ADLS_DIR_PATH + ADLS_FILE_NAME, - adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, - if_exists="replace", - validate_df_dict={"column_list_to_match": ["sender", "receivers"]}, - timeout=4400, - ) - - result = flow.run() - assert result.is_successful() - - os.remove("test_outlook_to_adls.parquet") - os.remove("romania_tehnic.csv") - os.remove("o365_token.txt") - rm = AzureDataLakeRemove( - path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s" - ) - rm.run(sp_credentials_secret=ADLS_CREDENTIAL_SECRET)