Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
giannibalistreri committed Dec 27, 2023
1 parent 35aecc3 commit 5927d19
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
PARSER.add_argument('-analytical_data_types', type=Any, required=True, default=None, help='assignment of features to analytical data types')
PARSER.add_argument('-missing_value_threshold', type=float, required=False, default=0.95, help='threshold to classify features as invalid based on the amount of missing values')
PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator')
PARSER.add_argument('-output_file_path_data_missing_data', type=str, required=True, default=None, help='file path of features containing too much missing data')
PARSER.add_argument('-output_file_path_invariant', type=str, required=True, default=None, help='file path of invariant features')
PARSER.add_argument('-output_file_path_duplicated', type=str, required=True, default=None, help='file path of duplicated features')
PARSER.add_argument('-output_file_path_valid_features', type=str, required=True, default=None, help='file path of valid features')
PARSER.add_argument('-output_file_path_prop_valid_features', type=str, required=True, default=None, help='file path of the proportion of valid features')
PARSER.add_argument('-output_file_path_data_missing_data', type=str, required=True, default=None, help='file path of features containing too much missing data output')
PARSER.add_argument('-output_file_path_invariant', type=str, required=True, default=None, help='file path of invariant features output')
PARSER.add_argument('-output_file_path_duplicated', type=str, required=True, default=None, help='file path of duplicated features output')
PARSER.add_argument('-output_file_path_valid_features', type=str, required=True, default=None, help='file path of valid features output')
PARSER.add_argument('-output_file_path_prop_valid_features', type=str, required=True, default=None, help='file path of the proportion of valid features output')
PARSER.add_argument('-s3_output_file_path_data_health_check', type=str, required=False, default=None, help='S3 file path of the data health check output')
ARGS = PARSER.parse_args()

Expand Down Expand Up @@ -55,19 +55,19 @@ def data_health_check(data_set_path: str,
Assigned analytical data types to each feature
:param output_file_path_missing_data: str
Path of the features containing too much missing data
Path of the features containing too much missing data output
:param output_file_path_invariant: str
Path of the invariant features
Path of the invariant features output
:param output_file_path_duplicated: str
Path of the duplicated features
Path of the duplicated features output
:param output_file_path_valid_features: str
Path of the valid features
Path of the valid features output
:param output_file_path_prop_valid_features: str
Path of the proportion of valid features
Path of the proportion of valid features output
:param missing_value_threshold: float
Threshold of missing values to exclude numeric feature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,43 @@
"""

import argparse
import json

from parallelizer import distribute_cases, distribute_elements, distribute_features, distribute_file_paths, ParallelizerException
from typing import NamedTuple
from aws import save_file_to_s3
from custom_logger import Log
from file_handler import file_handler
from parallelizer import (
distribute_analytical_data_types, distribute_cases, distribute_elements, distribute_features,
distribute_file_paths, ParallelizerException
)
from typing import Any, Dict, List, NamedTuple


PARSER = argparse.ArgumentParser(description="parallelize data")
PARSER.add_argument('-action', type=str, required=True, default=None, help='distribution action')
PARSER.add_argument('-analytical_data_types', type=Any, required=False, default=None, help='pre-defined analytical data types')
PARSER.add_argument('-data_file_path', type=str, required=False, default=None, help='complete file path of the input data set')
PARSER.add_argument('-bucket_name', type=str, required=False, default=None, help='name of the S3 bucket')
PARSER.add_argument('-s3_bucket_name', type=str, required=False, default=None, help='name of the S3 bucket')
PARSER.add_argument('-chunks', type=int, required=False, default=4, help='number of chunks to distribute')
PARSER.add_argument('-persist_data', type=int, required=False, default=1, help='whether to persist distributed chunks or not')
PARSER.add_argument('-elements', type=list, required=False, default=None, help='elements to distribute')
PARSER.add_argument('-prefix', type=str, required=False, default=None, help='prefix used for filtering folders in S3 bucket')
PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator')
PARSER.add_argument('-output_path_distribution', type=str, required=False, default=None, help='file path of the distribution output')
PARSER.add_argument('-s3_output_path_distribution', type=str, required=False, default=None, help='S3 file path of the distribution output')
ARGS = PARSER.parse_args()


def parallelizer(action: str,
output_path_distribution: str,
analytical_data_types: Dict[str, List[str]] = None,
data_file_path: str = None,
bucket_name: str = None,
sep: str = ','
chunks: int = 4,
persist_data: bool = True,
elements: list = None,
prefix: str = None,
sep: str = ',',
s3_output_path_distribution: str = None
) -> NamedTuple('outputs', [('distributed_values', list)]):
"""
Parallelize data
Expand All @@ -34,49 +53,85 @@ def parallelizer(action: str,
-> features: features of given data set
-> file_paths: file path in given S3 bucket
:param output_path_distribution: str
Path of the distribution output
:param analytical_data_types: dict
Assigned analytical data types to each feature
:param data_file_path: str
Complete file path of the data set
:param bucket_name: str
Name of the S3 bucket
:param chunks: int
Number of chunks to distribute
:param persist_data: bool
Whether to persist distributed chunks or not
:param elements: list
Elements to distribute
:param prefix: str
Prefix used for filtering folder in S3 bucket
:param sep: str
Separator
:param s3_output_path_distribution: str
Complete file path of the distribution output
:return: NamedTuple
Distributed values
"""
if action == 'cases':
if action == 'analytical_data_types':
_distributed_values: list = distribute_analytical_data_types(analytical_data_types=analytical_data_types,
file_path=data_file_path,
persist_data=persist_data,
sep=sep
)
elif action == 'cases':
_distributed_values: list = distribute_cases(file_path=data_file_path,
chunks=4,
persist_data=True,
chunks=chunks,
persist_data=persist_data,
sep=sep
)
elif action == 'elements':
_distributed_values: list = distribute_elements(elements=[],
chunks=5
_distributed_values: list = distribute_elements(elements=elements,
chunks=chunks
)
elif action == 'features':
_distributed_values: list = distribute_features(file_path=data_file_path,
persist_data=True,
chunks=None,
persist_data=persist_data,
chunks=chunks,
sep=sep
)
elif action == 'file_paths':
_distributed_values: list = distribute_file_paths(chunks=4,
_distributed_values: list = distribute_file_paths(chunks=chunks,
bucket_name=bucket_name,
prefix=None
prefix=prefix
)
else:
raise ParallelizerException(f'Action ({action}) not supported')
with open('distributed_values.json', 'w') as _file:
json.dump(_distributed_values, _file)
file_handler(file_path=output_path_distribution, obj=_distributed_values)
if s3_output_path_distribution is not None:
save_file_to_s3(file_path=s3_output_path_distribution, obj=_distributed_values)
Log().log(msg=f'Save distribution: {s3_output_path_distribution}')
return [_distributed_values]


if __name__ == '__main__':
parallelizer(action=ARGS.action,
output_path_distribution=ARGS.output_path_distribution,
analytical_data_types=ARGS.analytical_data_types,
data_file_path=ARGS.data_file_path,
bucket_name=ARGS.bucket_name,
sep=ARGS.sep
chunks=ARGS.chunks,
persist_data=ARGS.persist_data,
elements=ARGS.elements,
prefix=ARGS.prefix,
sep=ARGS.sep,
s3_output_path_distribution=ARGS.s3_output_path_distribution
)

0 comments on commit 5927d19

Please sign in to comment.