diff --git a/viadot/flows/outlook_to_adls.py b/viadot/flows/outlook_to_adls.py index 3f24a65d8..606674ba1 100644 --- a/viadot/flows/outlook_to_adls.py +++ b/viadot/flows/outlook_to_adls.py @@ -9,6 +9,7 @@ df_to_csv, df_to_parquet, union_dfs_task, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, OutlookToDF @@ -29,6 +30,7 @@ def __init__( limit: int = 10000, timeout: int = 3600, if_exists: Literal["append", "replace", "skip"] = "append", + validate_df_dict: dict = None, outlook_credentials_secret: str = "OUTLOOK", *args: List[Any], **kwargs: Dict[str, Any], @@ -54,6 +56,8 @@ def __init__( timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. if_exists (Literal['append', 'replace', 'skip'], optional): What to do if the local file already exists. Defaults to "append". + validate_df_dict (dict, optional): An optional dictionary to verify the received dataframe. + When passed, `validate_df` task validation tests are triggered. Defaults to None. """ self.mailbox_list = mailbox_list @@ -65,6 +69,9 @@ def __init__( self.local_file_path = local_file_path self.if_exsists = if_exists + # Validate DataFrame + self.validate_df_dict = validate_df_dict + # AzureDataLakeUpload self.adls_file_path = adls_file_path self.output_file_extension = output_file_extension @@ -98,6 +105,13 @@ def gen_flow(self) -> Flow: dfs = apply_map(self.gen_outlook_df, self.mailbox_list, flow=self) df = union_dfs_task.bind(dfs, flow=self) + + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) if self.output_file_extension == ".parquet":