diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index 2a2d6adb6..5bca3cc8e 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -7,6 +7,7 @@ from typing import Literal from prefect.backend import set_key_value from prefect.utilities import logging +from prefect.engine.state import Finished from viadot.task_utils import ( add_ingestion_metadata_task, @@ -189,9 +190,20 @@ def slugify(name): @task(slug="check_df") -def check_if_df_empty(df): +def check_if_df_empty(df, if_no_data_returned: str = "skip"): + # -> to task.utils + class NoDataReturnedError(Exception): + def __init__(self, message): + self.message = message + if len(df.index) == 0: - logger.info("No data in the response. Df empty.") + if if_no_data_returned == "skip": + logger.info("No data in the source response. Df empty.") + elif if_no_data_returned == "warn": + logger.warning("No data in the source response. Df empty.") + elif if_no_data_returned == "fail": + raise NoDataReturnedError("No data in the source response. Df empty.") + return True class SharepointListToADLS(Flow): @@ -214,7 +226,7 @@ def __init__( output_file_extension: str = ".parquet", validate_df_dict: dict = None, set_prefect_kv: bool = False, - if_no_data_returned: Literal["continue", "warn", "fail"] = "continue", + if_no_data_returned: Literal["skip", "warn", "fail"] = "skip", *args: List[any], **kwargs: Dict[str, Any], ): @@ -290,6 +302,7 @@ def __init__( self.vault_name = vault_name self.row_count = row_count self.validate_df_dict = validate_df_dict + self.if_no_data_returned = if_no_data_returned # AzureDataLakeUpload self.adls_dir_path = adls_dir_path @@ -339,12 +352,18 @@ def gen_flow(self) -> Flow: row_count=self.row_count, credentials_secret=self.sp_cert_credentials_secret, ) + df_empty = check_if_df_empty.bind(df, self.if_no_data_returned) + + if df_empty: + if self.if_no_data_returned == "warn": + raise Finished( + "Flow finished because there is no new data for ingestion." + ) if self.validate_df_dict: validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self) validation_task.set_upstream(df, flow=self) - check_if_df_empty.bind(df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) df_mapped = df_map_mixed_dtypes_for_parquet.bind(