Skip to content

Commit

Permalink
update for sharepoint list to df with function for checking df
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinpurtak committed Dec 1, 2023
1 parent 34859ed commit c197fb2
Showing 1 changed file with 67 additions and 58 deletions.
125 changes: 67 additions & 58 deletions viadot/flows/sharepoint_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from typing import Any, Dict, List

import pendulum
from prefect import Flow, task
from prefect import Flow, task, case
from prefect.engine.state import Failed
from prefect.engine.runner import ENDRUN
from typing import Literal
from prefect.backend import set_key_value
from prefect.utilities import logging
from prefect.engine.state import Finished

from viadot.task_utils import (
add_ingestion_metadata_task,
Expand Down Expand Up @@ -192,18 +193,25 @@ def slugify(name):
@task(slug="check_df")
def check_if_df_empty(df, if_no_data_returned: str = "skip"):
# -> to task.utils
class NoDataReturnedError(Exception):
class NoDataReturnedError(BaseException):
def __init__(self, message):
self.message = message

if len(df.index) == 0:
if if_no_data_returned == "skip":
logger.info("No data in the source response. Df empty.")
elif if_no_data_returned == "warn":
if df.empty:
if if_no_data_returned == "warn":
logger.warning("No data in the source response. Df empty.")
return True
# raise ENDRUN(state=Failed("Failed task raised"))
elif if_no_data_returned == "fail":
raise NoDataReturnedError("No data in the source response. Df empty.")
return True
raise NoDataReturnedError("No data in the source response. Df empty...")


@task
def check_status(status):
if status:
logger.info("inside df empty")
if self.if_no_data_returned == "warn":
raise Finished("Flow finished because there is no new data for ingestion.")


class SharepointListToADLS(Flow):
Expand Down Expand Up @@ -352,63 +360,64 @@ def gen_flow(self) -> Flow:
row_count=self.row_count,
credentials_secret=self.sp_cert_credentials_secret,
)
df_empty = check_if_df_empty.bind(df, self.if_no_data_returned)

if df_empty:
if self.if_no_data_returned == "warn":
raise Finished(
"Flow finished because there is no new data for ingestion."
)

if self.validate_df_dict:
validation_task = validate_df(df=df, tests=self.validate_df_dict, flow=self)
validation_task.set_upstream(df, flow=self)

df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self)
dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self)
df_mapped = df_map_mixed_dtypes_for_parquet.bind(
df_with_metadata, dtypes_dict, flow=self
)
if self.if_no_data_returned != "skip":
df_empty = check_if_df_empty.bind(df, self.if_no_data_returned, flow=self)
# If df empty there is no reason to run other tasks
else:
df_empty = False

df_to_file = df_to_parquet.bind(
df=df_mapped,
path=self.path,
flow=self,
)
with case(df_empty, False):
if self.validate_df_dict:
validation_task = validate_df(
df=df, tests=self.validate_df_dict, flow=self
)
validation_task.set_upstream(df, flow=self)

file_to_adls_task = AzureDataLakeUpload()
file_to_adls_task.bind(
from_path=self.path,
to_path=self.adls_dir_path,
overwrite=self.overwrite,
sp_credentials_secret=self.adls_sp_credentials_secret,
flow=self,
)
df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self)
dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self)
df_mapped = df_map_mixed_dtypes_for_parquet.bind(
df_with_metadata, dtypes_dict, flow=self
)

dtypes_to_json_task.bind(
dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self
)
df_to_file = df_to_parquet.bind(
df=df_mapped,
path=self.path,
flow=self,
)

json_to_adls_task = AzureDataLakeUpload()
json_to_adls_task.bind(
from_path=self.local_json_path,
to_path=self.adls_schema_file_dir_file,
overwrite=self.overwrite,
sp_credentials_secret=self.adls_sp_credentials_secret,
flow=self,
)
file_to_adls_task = AzureDataLakeUpload()
file_to_adls_task.bind(
from_path=self.path,
to_path=self.adls_dir_path,
overwrite=self.overwrite,
sp_credentials_secret=self.adls_sp_credentials_secret,
flow=self,
)

if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)
dtypes_to_json_task.bind(
dtypes_dict=dtypes_dict, local_json_path=self.local_json_path, flow=self
)

df_mapped.set_upstream(df_with_metadata, flow=self)
dtypes_to_json_task.set_upstream(df_mapped, flow=self)
df_to_file.set_upstream(dtypes_to_json_task, flow=self)
json_to_adls_task = AzureDataLakeUpload()
json_to_adls_task.bind(
from_path=self.local_json_path,
to_path=self.adls_schema_file_dir_file,
overwrite=self.overwrite,
sp_credentials_secret=self.adls_sp_credentials_secret,
flow=self,
)

file_to_adls_task.set_upstream(df_to_file, flow=self)
json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self)
if self.set_prefect_kv == True:
set_key_value(key=self.adls_dir_path, value=self.adls_file_path)
if self.validate_df_dict:
df_with_metadata.set_upstream(validation_task, flow=self)
dtypes_dict.set_upstream(df_with_metadata, flow=self)
df_mapped.set_upstream(df_with_metadata, flow=self)
dtypes_to_json_task.set_upstream(df_mapped, flow=self)
df_to_file.set_upstream(dtypes_to_json_task, flow=self)
file_to_adls_task.set_upstream(df_to_file, flow=self)
json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self)
if self.set_prefect_kv == True:
set_key_value(key=self.adls_dir_path, value=self.adls_file_path)

@staticmethod
def slugify(name):
Expand Down

0 comments on commit c197fb2

Please sign in to comment.