Skip to content

Commit

Permalink
Merge pull request dyvenia#788 from jkrobicki/outlook_validate_df
Browse files Browse the repository at this point in the history
Add `validate_df` task to `OutlookToADLS` flow
  • Loading branch information
m-paz authored Oct 26, 2023
2 parents a293f0f + 34e40cb commit 6d857b5
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions viadot/flows/outlook_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
df_to_csv,
df_to_parquet,
union_dfs_task,
validate_df,
)
from viadot.tasks import AzureDataLakeUpload, OutlookToDF

Expand All @@ -29,6 +30,7 @@ def __init__(
limit: int = 10000,
timeout: int = 3600,
if_exists: Literal["append", "replace", "skip"] = "append",
validate_df_dict: dict = None,
outlook_credentials_secret: str = "OUTLOOK",
*args: List[Any],
**kwargs: Dict[str, Any],
Expand All @@ -54,6 +56,8 @@ def __init__(
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
if_exists (Literal['append', 'replace', 'skip'], optional): What to do if the local file already exists. Defaults to "append".
validate_df_dict (dict, optional): An optional dictionary to verify the received dataframe.
When passed, `validate_df` task validation tests are triggered. Defaults to None.
"""

self.mailbox_list = mailbox_list
Expand All @@ -65,6 +69,9 @@ def __init__(
self.local_file_path = local_file_path
self.if_exsists = if_exists

# Validate DataFrame
self.validate_df_dict = validate_df_dict

# AzureDataLakeUpload
self.adls_file_path = adls_file_path
self.output_file_extension = output_file_extension
Expand Down Expand Up @@ -98,6 +105,13 @@ def gen_flow(self) -> Flow:
dfs = apply_map(self.gen_outlook_df, self.mailbox_list, flow=self)

df = union_dfs_task.bind(dfs, flow=self)

if self.validate_df_dict:
validation_task = validate_df.bind(
df, tests=self.validate_df_dict, flow=self
)
validation_task.set_upstream(df, flow=self)

df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self)

if self.output_file_extension == ".parquet":
Expand Down

0 comments on commit 6d857b5

Please sign in to comment.