diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/task.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/task.py index 3d6a4b9..82cc379 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/task.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/task.py @@ -19,11 +19,11 @@ PARSER.add_argument('-analytical_data_types', type=Any, required=True, default=None, help='assignment of features to analytical data types') PARSER.add_argument('-missing_value_threshold', type=float, required=False, default=0.95, help='threshold to classify features as invalid based on the amount of missing values') PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator') -PARSER.add_argument('-output_file_path_data_missing_data', type=str, required=True, default=None, help='file path of features containing too much missing data') -PARSER.add_argument('-output_file_path_invariant', type=str, required=True, default=None, help='file path of invariant features') -PARSER.add_argument('-output_file_path_duplicated', type=str, required=True, default=None, help='file path of duplicated features') -PARSER.add_argument('-output_file_path_valid_features', type=str, required=True, default=None, help='file path of valid features') -PARSER.add_argument('-output_file_path_prop_valid_features', type=str, required=True, default=None, help='file path of the proportion of valid features') +PARSER.add_argument('-output_file_path_data_missing_data', type=str, required=True, default=None, help='file path of features containing too much missing data output') +PARSER.add_argument('-output_file_path_invariant', type=str, required=True, default=None, help='file path of invariant features output') +PARSER.add_argument('-output_file_path_duplicated', type=str, required=True, default=None, help='file path of duplicated features output') +PARSER.add_argument('-output_file_path_valid_features', type=str, required=True, default=None, help='file path of valid features output') +PARSER.add_argument('-output_file_path_prop_valid_features', type=str, required=True, default=None, help='file path of the proportion of valid features output') PARSER.add_argument('-s3_output_file_path_data_health_check', type=str, required=False, default=None, help='S3 file path of the data health check output') ARGS = PARSER.parse_args() @@ -55,19 +55,19 @@ def data_health_check(data_set_path: str, Assigned analytical data types to each feature :param output_file_path_missing_data: str - Path of the features containing too much missing data + Path of the features containing too much missing data output :param output_file_path_invariant: str - Path of the invariant features + Path of the invariant features output :param output_file_path_duplicated: str - Path of the duplicated features + Path of the duplicated features output :param output_file_path_valid_features: str - Path of the valid features + Path of the valid features output :param output_file_path_prop_valid_features: str - Path of the proportion of valid features + Path of the proportion of valid features output :param missing_value_threshold: float Threshold of missing values to exclude numeric feature diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/parallelizer/task_container/src/task.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/parallelizer/task_container/src/task.py index 533e792..741ad22 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/parallelizer/task_container/src/task.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/parallelizer/task_container/src/task.py @@ -5,24 +5,43 @@ """ import argparse -import json -from parallelizer import distribute_cases, distribute_elements, distribute_features, distribute_file_paths, ParallelizerException -from typing import NamedTuple +from aws import save_file_to_s3 +from custom_logger import Log +from file_handler import file_handler +from parallelizer import ( + distribute_analytical_data_types, distribute_cases, distribute_elements, distribute_features, + distribute_file_paths, ParallelizerException +) +from typing import Any, Dict, List, NamedTuple PARSER = argparse.ArgumentParser(description="parallelize data") PARSER.add_argument('-action', type=str, required=True, default=None, help='distribution action') +PARSER.add_argument('-analytical_data_types', type=Any, required=False, default=None, help='pre-defined analytical data types') PARSER.add_argument('-data_file_path', type=str, required=False, default=None, help='complete file path of the input data set') -PARSER.add_argument('-bucket_name', type=str, required=False, default=None, help='name of the S3 bucket') +PARSER.add_argument('-s3_bucket_name', type=str, required=False, default=None, help='name of the S3 bucket') +PARSER.add_argument('-chunks', type=int, required=False, default=4, help='number of chunks to distribute') +PARSER.add_argument('-persist_data', type=int, required=False, default=1, help='whether to persist distributed chunks or not') +PARSER.add_argument('-elements', type=list, required=False, default=None, help='elements to distribute') +PARSER.add_argument('-prefix', type=str, required=False, default=None, help='prefix used for filtering folders in S3 bucket') PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator') +PARSER.add_argument('-output_path_distribution', type=str, required=False, default=None, help='file path of the distribution output') +PARSER.add_argument('-s3_output_path_distribution', type=str, required=False, default=None, help='S3 file path of the distribution output') ARGS = PARSER.parse_args() def parallelizer(action: str, + output_path_distribution: str, + analytical_data_types: Dict[str, List[str]] = None, data_file_path: str = None, bucket_name: str = None, - sep: str = ',' + chunks: int = 4, + persist_data: bool = True, + elements: list = None, + prefix: str = None, + sep: str = ',', + s3_output_path_distribution: str = None ) -> NamedTuple('outputs', [('distributed_values', list)]): """ Parallelize data @@ -34,49 +53,85 @@ def parallelizer(action: str, -> features: features of given data set -> file_paths: file path in given S3 bucket + :param output_path_distribution: str + Path of the distribution output + + :param analytical_data_types: dict + Assigned analytical data types to each feature + :param data_file_path: str Complete file path of the data set :param bucket_name: str Name of the S3 bucket + :param chunks: int + Number of chunks to distribute + + :param persist_data: bool + Whether to persist distributed chunks or not + + :param elements: list + Elements to distribute + + :param prefix: str + Prefix used for filtering folder in S3 bucket + :param sep: str Separator + :param s3_output_path_distribution: str + Complete file path of the distribution output + :return: NamedTuple Distributed values """ - if action == 'cases': + if action == 'analytical_data_types': + _distributed_values: list = distribute_analytical_data_types(analytical_data_types=analytical_data_types, + file_path=data_file_path, + persist_data=persist_data, + sep=sep + ) + elif action == 'cases': _distributed_values: list = distribute_cases(file_path=data_file_path, - chunks=4, - persist_data=True, + chunks=chunks, + persist_data=persist_data, sep=sep ) elif action == 'elements': - _distributed_values: list = distribute_elements(elements=[], - chunks=5 + _distributed_values: list = distribute_elements(elements=elements, + chunks=chunks ) elif action == 'features': _distributed_values: list = distribute_features(file_path=data_file_path, - persist_data=True, - chunks=None, + persist_data=persist_data, + chunks=chunks, sep=sep ) elif action == 'file_paths': - _distributed_values: list = distribute_file_paths(chunks=4, + _distributed_values: list = distribute_file_paths(chunks=chunks, bucket_name=bucket_name, - prefix=None + prefix=prefix ) else: raise ParallelizerException(f'Action ({action}) not supported') - with open('distributed_values.json', 'w') as _file: - json.dump(_distributed_values, _file) + file_handler(file_path=output_path_distribution, obj=_distributed_values) + if s3_output_path_distribution is not None: + save_file_to_s3(file_path=s3_output_path_distribution, obj=_distributed_values) + Log().log(msg=f'Save distribution: {s3_output_path_distribution}') return [_distributed_values] if __name__ == '__main__': parallelizer(action=ARGS.action, + output_path_distribution=ARGS.output_path_distribution, + analytical_data_types=ARGS.analytical_data_types, data_file_path=ARGS.data_file_path, bucket_name=ARGS.bucket_name, - sep=ARGS.sep + chunks=ARGS.chunks, + persist_data=ARGS.persist_data, + elements=ARGS.elements, + prefix=ARGS.prefix, + sep=ARGS.sep, + s3_output_path_distribution=ARGS.s3_output_path_distribution )