Skip to content

Commit

Permalink
Sharepoint orchestration code refactor (#950)
Browse files Browse the repository at this point in the history
* ✨ Moved sharepoint tasks from prefect_viadot repo

* ✨ Moved sharepoint_to_redshift_spectrum flow from prefect_viadot repo

* 🔥 Cleaned up init for prefect tasks

* Added `viadot.orchestration.prefect`
  • Loading branch information
Rafalz13 authored Aug 6, 2024
1 parent 22a4957 commit 0c43153
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 321 deletions.
Original file line number Diff line number Diff line change
@@ -1,105 +1,15 @@
"""Flows for downloading data from Sharepoint and uploading it to AWS Redshift Spectrum.""" # noqa: W505

from pathlib import Path
from typing import Any, Literal

import pandas as pd
from prefect import flow

from viadot.orchestration.prefect.tasks import (
df_to_redshift_spectrum,
get_endpoint_type_from_url,
scan_sharepoint_folder,
sharepoint_to_df,
validate_and_reorder_dfs_columns,
)


def load_data_from_sharepoint(
file_sheet_mapping: dict | None,
download_all_files: bool,
sharepoint_url: str,
sheet_name: str | list[str | int] | int | None = None,
columns: str | list[str] | list[int] | None = None,
na_values: list[str] | None = None,
credentials_config: dict[str, Any] | None = None,
) -> dict:
"""Loads data from SharePoint and returns it as a dictionary of DataFrames.
This function fetches data from SharePoint based on the provided file-sheet
mapping or by downloading all files in a specified folder. It returns the
data as a dictionary where keys are filenames and values are the respective
DataFrames.
Args:
file_sheet_mapping (dict): A dictionary where keys are filenames and values are
the sheet names to be loaded from each file. If provided, only these files
and sheets will be downloaded.
download_all_files (bool): A flag indicating whether to download all files from
the SharePoint folder specified by the `sharepoint_url`. This is ignored if
`file_sheet_mapping` is provided.
sharepoint_url (str): The base URL of the SharePoint site or folder.
sheet_name (str): The name of the sheet to load if `file_sheet_mapping` is not
provided. This is used when downloading all files.
columns (str | list[str] | list[int], optional): Which columns to ingest.
Defaults to None.
na_values (list[str] | None): Additional strings to recognize as NA/NaN.
If list passed, the specific NA values for each column will be recognized.
Defaults to None.
credentials_config (dict, optional): A dictionary containing credentials and
configuration for SharePoint. Defaults to None.
Returns:
dict: A dictionary where keys are filenames and values are DataFrames containing
the data from the corresponding sheets.
Options:
- If `file_sheet_mapping` is provided, only the specified files and sheets are
downloaded.
- If `download_all_files` is True, all files in the specified SharePoint folder
are downloaded and the data is loaded from the specified `sheet_name`.
"""
dataframes_dict = {}
credentials_secret = (
credentials_config.get("secret") if credentials_config else None
)
credentials = credentials_config.get("credentials") if credentials_config else None
config_key = credentials_config.get("config_key") if credentials_config else None

if file_sheet_mapping:
for file, sheet in file_sheet_mapping.items():
df = sharepoint_to_df(
url=sharepoint_url + file,
sheet_name=sheet,
columns=columns,
na_values=na_values,
credentials_secret=credentials_secret,
credentials=credentials,
config_key=config_key,
)
dataframes_dict[file] = df
elif download_all_files:
list_of_urls = scan_sharepoint_folder(
url=sharepoint_url,
credentials_secret=credentials_secret,
credentials=credentials,
config_key=config_key,
)
for file_url in list_of_urls:
df = sharepoint_to_df(
url=file_url,
sheet_name=sheet_name,
columns=columns,
na_values=na_values,
credentials_secret=credentials_secret,
credentials=credentials,
config_key=config_key,
)
file_name = Path(file_url).stem + Path(file_url).suffix
dataframes_dict[file_name] = df
return dataframes_dict


@flow(
name="extract--sharepoint--redshift_spectrum",
description="Extract data from Sharepoint and load it into AWS Redshift Spectrum.",
Expand Down Expand Up @@ -127,14 +37,22 @@ def sharepoint_to_redshift_spectrum( # noqa: PLR0913, PLR0917
sharepoint_config_key: str | None = None,
sharepoint_credentials: dict[str, Any] | None = None,
file_sheet_mapping: dict | None = None,
download_all_files: bool = False,
return_as_one_table: bool = False,
) -> None:
"""Extract data from SharePoint and load it into AWS Redshift Spectrum.
This function downloads data from SharePoint and uploads it to AWS Redshift
Spectrum, either as a single table or as multiple tables depending on the provided
parameters.
This function downloads data either from SharePoint file or the whole directory and
uploads it to AWS Redshift Spectrum.
Modes:
If the `URL` ends with the file (e.g ../file.xlsx) it downloads only the file and
creates a table from it.
If the `URL` ends with the folder (e.g ../folder_name/): it downloads multiple files
and creates a table from them:
- If `file_sheet_mapping` is provided, it downloads and processes only
the specified files and sheets.
- If `file_sheet_mapping` is NOT provided, it downloads and processes all of
the files from the chosen folder.
Args:
sharepoint_url (str): The URL to the file.
Expand Down Expand Up @@ -175,106 +93,32 @@ def sharepoint_to_redshift_spectrum( # noqa: PLR0913, PLR0917
relevant credentials. Defaults to None.
sharepoint_credentials (dict, optional): Credentials to Sharepoint. Defaults to
None.
file_sheet_mapping(dict, optional): Dictionary with mapping sheet for each file
that should be downloaded. If the parameter is provided only data from this
dictionary are downloaded. Defaults to empty dictionary.
download_all_files (bool, optional): Whether to download all files from
the folder. Defaults to False.
return_as_one_table (bool, optional): Whether to load data to a single table.
Defaults to False.
The function operates in two main modes:
1. If `file_sheet_mapping` is provided, it downloads and processes only
the specified files and sheets.
2. If `download_all_files` is True, it scans the SharePoint folder for all files
and processes them.
Additionally, depending on the value of `return_as_one_table`, the data is either
combined into a single table or uploaded as multiple tables.
file_sheet_mapping (dict): A dictionary where keys are filenames and values are
the sheet names to be loaded from each file. If provided, only these files
and sheets will be downloaded. Defaults to None.
"""
sharepoint_credentials_config = {
"secret": sharepoint_credentials_secret,
"credentials": sharepoint_credentials,
"config_key": sharepoint_config_key,
}
endpoint_type = get_endpoint_type_from_url(url=sharepoint_url)

if endpoint_type == "file":
df = sharepoint_to_df(
url=sharepoint_url,
sheet_name=sheet_name,
columns=columns,
na_values=na_values,
credentials_secret=sharepoint_credentials_secret,
config_key=sharepoint_config_key,
credentials=sharepoint_credentials,
)

df_to_redshift_spectrum(
df=df,
to_path=to_path,
schema_name=schema_name,
table=table,
extension=extension,
if_exists=if_exists,
partition_cols=partition_cols,
index=index,
compression=compression,
sep=sep,
description=description,
credentials=aws_credentials,
config_key=aws_config_key,
)
else:
dataframes_dict = load_data_from_sharepoint(
file_sheet_mapping=file_sheet_mapping,
download_all_files=download_all_files,
sharepoint_url=sharepoint_url,
sheet_name=sheet_name,
columns=columns,
na_values=na_values,
credentials_config=sharepoint_credentials_config,
)

if return_as_one_table is True:
dataframes_list = list(dataframes_dict.values())
validated_and_reordered_dfs = validate_and_reorder_dfs_columns(
dataframes_list=dataframes_list
)
final_df = pd.concat(validated_and_reordered_dfs, ignore_index=True)

df_to_redshift_spectrum(
df=final_df,
to_path=to_path,
schema_name=schema_name,
table=table,
extension=extension,
if_exists=if_exists,
partition_cols=partition_cols,
index=index,
compression=compression,
sep=sep,
description=description,
credentials=aws_credentials,
config_key=aws_config_key,
)

elif return_as_one_table is False:
for file_name, df in dataframes_dict.items():
file_name_without_extension = Path(file_name).stem
df_to_redshift_spectrum(
df=df,
to_path=f"{to_path}_{file_name_without_extension}", # to test
schema_name=schema_name,
table=f"{table}_{file_name_without_extension}",
extension=extension,
if_exists=if_exists,
partition_cols=partition_cols,
index=index,
compression=compression,
sep=sep,
description=description,
credentials=aws_credentials,
config_key=aws_config_key,
)
df = sharepoint_to_df(
url=sharepoint_url,
sheet_name=sheet_name,
columns=columns,
na_values=na_values,
file_sheet_mapping=file_sheet_mapping,
credentials_secret=sharepoint_credentials_secret,
config_key=sharepoint_config_key,
credentials=sharepoint_credentials,
)
df_to_redshift_spectrum(
df=df,
to_path=to_path,
schema_name=schema_name,
table=table,
extension=extension,
if_exists=if_exists,
partition_cols=partition_cols,
index=index,
compression=compression,
sep=sep,
description=description,
credentials=aws_credentials,
config_key=aws_config_key,
)
11 changes: 4 additions & 7 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
from .redshift_spectrum import df_to_redshift_spectrum # noqa: F401
from .s3 import s3_upload_file # noqa: F401
from .sap_rfc import sap_rfc_to_df # noqa: F401
from .sharepoint import get_endpoint_type_from_url # noqa: F401
from .sharepoint import scan_sharepoint_folder # noqa: F401
from .sharepoint import sharepoint_download_file # noqa: F401
from .sharepoint import sharepoint_to_df # noqa: F401
from .sharepoint import validate_and_reorder_dfs_columns # noqa: F401
from .sharepoint import (
sharepoint_download_file, # noqa: F401
sharepoint_to_df, # noqa: F401
)
from .sql_server import (create_sql_server_table, sql_server_query,
sql_server_to_df)


Loading

0 comments on commit 0c43153

Please sign in to comment.