Skip to content

Commit

Permalink
Modified logic for check df. Df check and flow Finish in the flow added
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinpurtak committed Nov 29, 2023
1 parent c4f07df commit 34859ed
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions viadot/flows/sharepoint_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Literal
from prefect.backend import set_key_value
from prefect.utilities import logging
from prefect.engine.state import Finished

from viadot.task_utils import (
add_ingestion_metadata_task,
Expand Down Expand Up @@ -189,9 +190,20 @@ def slugify(name):


@task(slug="check_df")
def check_if_df_empty(df):
def check_if_df_empty(df, if_no_data_returned: str = "skip"):
# -> to task.utils
class NoDataReturnedError(Exception):
def __init__(self, message):
self.message = message

if len(df.index) == 0:
logger.info("No data in the response. Df empty.")
if if_no_data_returned == "skip":
logger.info("No data in the source response. Df empty.")
elif if_no_data_returned == "warn":
logger.warning("No data in the source response. Df empty.")
elif if_no_data_returned == "fail":
raise NoDataReturnedError("No data in the source response. Df empty.")
return True


class SharepointListToADLS(Flow):
Expand All @@ -214,7 +226,7 @@ def __init__(
output_file_extension: str = ".parquet",
validate_df_dict: dict = None,
set_prefect_kv: bool = False,
if_no_data_returned: Literal["continue", "warn", "fail"] = "continue",
if_no_data_returned: Literal["skip", "warn", "fail"] = "skip",
*args: List[any],
**kwargs: Dict[str, Any],
):
Expand Down Expand Up @@ -290,6 +302,7 @@ def __init__(
self.vault_name = vault_name
self.row_count = row_count
self.validate_df_dict = validate_df_dict
self.if_no_data_returned = if_no_data_returned

# AzureDataLakeUpload
self.adls_dir_path = adls_dir_path
Expand Down Expand Up @@ -339,12 +352,18 @@ def gen_flow(self) -> Flow:
row_count=self.row_count,
credentials_secret=self.sp_cert_credentials_secret,
)
df_empty = check_if_df_empty.bind(df, self.if_no_data_returned)

if df_empty:
if self.if_no_data_returned == "warn":
raise Finished(
"Flow finished because there is no new data for ingestion."
)

if self.validate_df_dict:
validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self)
validation_task.set_upstream(df, flow=self)

check_if_df_empty.bind(df, flow=self)
df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self)
dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self)
df_mapped = df_map_mixed_dtypes_for_parquet.bind(
Expand Down

0 comments on commit 34859ed

Please sign in to comment.