diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index 5bca3cc8e..bac4053b9 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -3,11 +3,12 @@ from typing import Any, Dict, List import pendulum -from prefect import Flow, task +from prefect import Flow, task, case +from prefect.engine.state import Failed +from prefect.engine.runner import ENDRUN from typing import Literal from prefect.backend import set_key_value from prefect.utilities import logging -from prefect.engine.state import Finished from viadot.task_utils import ( add_ingestion_metadata_task, @@ -192,18 +193,25 @@ def slugify(name): @task(slug="check_df") def check_if_df_empty(df, if_no_data_returned: str = "skip"): # -> to task.utils - class NoDataReturnedError(Exception): + class NoDataReturnedError(BaseException): def __init__(self, message): self.message = message - if len(df.index) == 0: - if if_no_data_returned == "skip": - logger.info("No data in the source response. Df empty.") - elif if_no_data_returned == "warn": + if df.empty: + if if_no_data_returned == "warn": logger.warning("No data in the source response. Df empty.") + return True + # raise ENDRUN(state=Failed("Failed task raised")) elif if_no_data_returned == "fail": - raise NoDataReturnedError("No data in the source response. Df empty.") - return True + raise NoDataReturnedError("No data in the source response. Df empty...") + + +@task +def check_status(status): + if status: + logger.info("inside df empty") + if self.if_no_data_returned == "warn": + raise Finished("Flow finished because there is no new data for ingestion.") class SharepointListToADLS(Flow): @@ -352,63 +360,64 @@ def gen_flow(self) -> Flow: row_count=self.row_count, credentials_secret=self.sp_cert_credentials_secret, ) - df_empty = check_if_df_empty.bind(df, self.if_no_data_returned) - - if df_empty: - if self.if_no_data_returned == "warn": - raise Finished( - "Flow finished because there is no new data for ingestion." - ) - if self.validate_df_dict: - validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) - validation_task.set_upstream(df, flow=self) - - df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) - dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) - df_mapped = df_map_mixed_dtypes_for_parquet.bind( - df_with_metadata, dtypes_dict, flow=self - ) + if self.if_no_data_returned != "skip": + df_empty = check_if_df_empty.bind(df, self.if_no_data_returned, flow=self) + # If df empty there is no reason to run other tasks + else: + df_empty = False - df_to_file = df_to_parquet.bind( - df=df_mapped, - path=self.path, - flow=self, - ) + with case(df_empty, False): + if self.validate_df_dict: + validation_task = validate_df( + df=df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) - file_to_adls_task = AzureDataLakeUpload() - file_to_adls_task.bind( - from_path=self.path, - to_path=self.adls_dir_path, - overwrite=self.overwrite, - sp_credentials_secret=self.adls_sp_credentials_secret, - flow=self, - ) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_mapped = df_map_mixed_dtypes_for_parquet.bind( + df_with_metadata, dtypes_dict, flow=self + ) - dtypes_to_json_task.bind( - dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self - ) + df_to_file = df_to_parquet.bind( + df=df_mapped, + path=self.path, + flow=self, + ) - json_to_adls_task = AzureDataLakeUpload() - json_to_adls_task.bind( - from_path=self.local_json_path, - to_path=self.adls_schema_file_dir_file, - overwrite=self.overwrite, - sp_credentials_secret=self.adls_sp_credentials_secret, - flow=self, - ) + file_to_adls_task = AzureDataLakeUpload() + file_to_adls_task.bind( + from_path=self.path, + to_path=self.adls_dir_path, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) - if self.validate_df_dict: - df_with_metadata.set_upstream(validation_task, flow=self) + dtypes_to_json_task.bind( + dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self + ) - df_mapped.set_upstream(df_with_metadata, flow=self) - dtypes_to_json_task.set_upstream(df_mapped, flow=self) - df_to_file.set_upstream(dtypes_to_json_task, flow=self) + json_to_adls_task = AzureDataLakeUpload() + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) - file_to_adls_task.set_upstream(df_to_file, flow=self) - json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) - if self.set_prefect_kv == True: - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) + if self.validate_df_dict: + df_with_metadata.set_upstream(validation_task, flow=self) + dtypes_dict.set_upstream(df_with_metadata, flow=self) + df_mapped.set_upstream(df_with_metadata, flow=self) + dtypes_to_json_task.set_upstream(df_mapped, flow=self) + df_to_file.set_upstream(dtypes_to_json_task, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + if self.set_prefect_kv == True: + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) @staticmethod def slugify(name):