From 91fbbda1c93eb96c86c2d5ab4d20bd5855e3a157 Mon Sep 17 00:00:00 2001 From: giannibalistreri Date: Sat, 30 Dec 2023 00:59:16 +0100 Subject: [PATCH] Refactoring --- .../task_container/src/feature_engineering.py | 48 ++++-- .../task_container/src/task.py | 138 +++++++++--------- 2 files changed, 105 insertions(+), 81 deletions(-) diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/feature_engineering.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/feature_engineering.py index 7e59a67..e4ce327 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/feature_engineering.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/feature_engineering.py @@ -70,6 +70,7 @@ def __init__(self, df: pd.DataFrame, analytical_data_types: Dict[str, List[str]], features: List[str] = None, + target_feature: str = None, processing_memory: dict = None, feature_engineering_config: Dict[str, list] = None ): @@ -83,6 +84,9 @@ def __init__(self, :param features: List[str] Name of the features + :param target_feature: str + Name of the target feature + :param processing_memory: dict Processing memory @@ -90,6 +94,7 @@ def __init__(self, Pre-defined configuration """ self.df: pd.DataFrame = df + self.target_feature: str = target_feature self.features: List[str] = self.df.columns.tolist() if features is None else features if processing_memory is None: self.processing_memory: dict = dict(level={'0': df.columns.tolist()}, @@ -98,9 +103,7 @@ def __init__(self, analytical_data_types=analytical_data_types, next_level_numeric_features_base=[], next_level_categorical_features_base=[], - numeric_features=[], - categorical_features=[], - exclude=[] + new_target_feature=target_feature ) else: self.processing_memory: dict = processing_memory @@ -223,12 +226,23 @@ def _process_memory(self, meth: str, param: dict, feature: str, interactor: str, """ Process memory - :param meth: - :param param: - :param feature: - :param interactor: - :param new_feature: - :param categorical: + :param meth: str + Name of the feature engineering (class method) + + :param param: dict + Parameter setting + + :param feature: str + Name of the feature + + :param interactor: str + Name of the interactor feature + + :param new_feature: str + Name of the new generated feature + + :param categorical: bool + Whether new generated feature is categorical or numeric """ self.processing_memory['level'][str(self.level)].update({new_feature: dict(meth=meth, param=param, @@ -252,10 +266,16 @@ def _process_memory(self, meth: str, param: dict, feature: str, interactor: str, self.processing_memory['feature_relations'][new_feature].append(interactor) if categorical: self.processing_memory['analytical_data_types']['categorical'].append(new_feature) - self.processing_memory['next_level_categorical_features_base'].append(new_feature) + if feature == self.target_feature: + self.processing_memory['new_target_feature'] = new_feature + else: + self.processing_memory['next_level_categorical_features_base'].append(new_feature) else: self.processing_memory['analytical_data_types']['continuous'].append(new_feature) - self.processing_memory['next_level_numeric_features_base'].append(new_feature) + if feature == self.target_feature: + self.processing_memory['new_target_feature'] = new_feature + else: + self.processing_memory['next_level_numeric_features_base'].append(new_feature) def add(self, feature_name: str, interaction_feature_name: str) -> np.ndarray: """ @@ -523,7 +543,7 @@ def log_transform(self, feature_name: str) -> np.ndarray: """ return np.log(self.df[feature_name].values) - def main(self, feature_engineering_config: Dict[str, list]) -> pd.DataFrame: + def main(self) -> pd.DataFrame: """ Apply feature engineering using (tabular) structured data @@ -532,9 +552,9 @@ def main(self, feature_engineering_config: Dict[str, list]) -> pd.DataFrame: """ self.processing_memory['level'].update({str(self.level): {}}) _df: pd.DataFrame = pd.DataFrame() - for meth in feature_engineering_config.keys(): + for meth in self.feature_engineering_config.keys(): _engineering_meth = getattr(self, meth, None) - for element in feature_engineering_config[meth]: + for element in self.feature_engineering_config[meth]: if isinstance(element, str): _param: dict = dict(feature_name=element) elif isinstance(element, tuple): diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/task.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/task.py index fbc308f..2c728cb 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/task.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/task.py @@ -5,12 +5,11 @@ """ import argparse -import boto3 import pandas as pd from aws import load_file_from_s3, save_file_to_s3 from custom_logger import Log -from feature_engineering import ENGINEERING_METH, MIN_FEATURES_BY_METH, FeatureEngineer +from feature_engineering import FeatureEngineer from file_handler import file_handler from typing import Any, Dict, NamedTuple, List @@ -18,37 +17,38 @@ PARSER.add_argument('-data_set_path', type=str, required=True, default=None, help='file path of the data set') PARSER.add_argument('-analytical_data_types', type=Any, required=True, default=None, help='assignment of features to analytical data types') PARSER.add_argument('-target_feature', type=str, required=True, default=None, help='name of the target feature') -PARSER.add_argument('-output_bucket_name', type=str, required=True, default=None, help='name of the S3 output bucket') -PARSER.add_argument('-output_file_path_data_set', type=str, required=True, default=None, help='file path of the data set') -PARSER.add_argument('-output_file_path_processor_memory', type=str, required=True, default=None, help='file path of output processing memory') -PARSER.add_argument('-output_file_path_target', type=str, required=True, default=None, help='file path of the output target feature') -PARSER.add_argument('-output_file_path_predictors', type=str, required=True, default=None, help='file path of the output predictors') -PARSER.add_argument('-output_file_path_engineered_feature_names', type=str, required=True, default=None, help='file path of the output processed features') -PARSER.add_argument('-re_engineering', type=bool, required=False, default=False, help='whether to re-engineer features for inference or not') -PARSER.add_argument('-next_level', type=bool, required=False, default=False, help='whether to generate deeper engineered features or not') +PARSER.add_argument('-re_engineering', type=int, required=False, default=False, help='whether to re-engineer features for inference or not') +PARSER.add_argument('-next_level', type=int, required=False, default=False, help='whether to generate deeper engineered features or not') PARSER.add_argument('-feature_engineering_config', type=Any, required=False, default=None, help='feature engineering pre-defined config file') -PARSER.add_argument('-feature_names', type=str, required=False, default=None, help='pre-defined feature names used for feature engineering') +PARSER.add_argument('-features', type=str, required=False, default=None, help='feature names used for feature engineering') PARSER.add_argument('-exclude', type=str, required=False, default=None, help='pre-defined feature names to exclude') -PARSER.add_argument('-exclude_original_data', type=str, required=False, default=None, help='exclude all original features (especially numeric features)') +PARSER.add_argument('-exclude_original_data', type=int, required=False, default=0, help='whether to exclude all original features (especially numeric features) or not') PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator') +PARSER.add_argument('-output_file_path_predictors', type=str, required=True, default=None, help='file path of the predictors output') +PARSER.add_argument('-output_file_path_new_target_feature', type=str, required=True, default=None, help='file path of the new target feature output') +PARSER.add_argument('-output_file_path_analytical_data_types', type=str, required=True, default=None, help='file path of the analytical data types output') +PARSER.add_argument('-s3_output_file_path_data_set', type=str, required=True, default=None, help='S3 file path of the engineered data set') +PARSER.add_argument('-s3_output_file_path_processor_memory', type=str, required=True, default=None, help='S3 file path of the processor memory') ARGS = PARSER.parse_args() def feature_engineer(data_set_path: str, - analytical_data_types: dict, - target_feature_name: str, + analytical_data_types: Dict[str, List[str]], + target_feature: str, s3_output_file_path_data_set: str, s3_output_file_path_processor_memory: str, output_file_path_predictors: str, + output_file_path_new_target_feature: str, output_file_path_analytical_data_types: str, re_engineering: bool = False, next_level: bool = False, feature_engineering_config: Dict[str, list] = None, - feature_names: List[str] = None, + features: List[str] = None, exclude: List[str] = None, exclude_original_data: bool = False, sep: str = ',' ) -> NamedTuple('outputs', [('predictors', list), + ('new_target_feature', str), ('analytical_data_types', dict) ] ): @@ -61,23 +61,23 @@ def feature_engineer(data_set_path: str, :param analytical_data_types: dict Assigned analytical data types to each feature - :param target_feature_name: str + :param target_feature: str Name of the target feature - :param output_file_path_data_set: str - Path of the data set to save + :param s3_output_file_path_data_set: str + Complete file path of the data set to save - :param output_file_path_processor_memory: str - Path of the processing memory to save - - :param output_file_path_target: str - Path of the target feature to save + :param s3_output_file_path_processor_memory: str + Complete file path of the processing memory to save :param output_file_path_predictors: str - Path of the predictors to save + Path of the predictors output + + :param output_file_path_new_target_feature: str + Path of the new target feature output - :param output_file_path_engineered_feature_names: str - Path of the engineered feature names + :param output_file_path_analytical_data_types: str + Path of the updated analytical data types output :param re_engineering: bool Whether to re-engineer features for inference or to engineer for training @@ -88,7 +88,7 @@ def feature_engineer(data_set_path: str, :param feature_engineering_config: Dict[str, list] Pre-defined configuration - :param feature_names: List[str] + :param features: List[str] Name of the features :param exclude: List[str] @@ -104,33 +104,43 @@ def feature_engineer(data_set_path: str, Path of the engineered data set """ _df: pd.DataFrame = pd.read_csv(filepath_or_buffer=data_set_path, sep=sep) - _features: List[str] = _df.columns.tolist() if feature_names is None else feature_names + _features: List[str] = _df.columns.tolist() if features is None else features + _target_feature: str = target_feature + if _target_feature in _features: + del _features[_features.index(_target_feature)] + _predictors: List[str] = _features if re_engineering: - _predictors: List[str] = _features _feature_names_engineered: List[str] = None - _processing_memory: dict = load_file_from_s3(file_path=s3_output_file_path_processor_memory, - encoding='utf-8' - ) - _feature_engineer: FeatureEngineer = FeatureEngineer(df=_df, processing_memory=_processing_memory) - _df_engineered = _feature_engineer.re_engineering(features=_predictors) + _processing_memory: dict = load_file_from_s3(file_path=s3_output_file_path_processor_memory) + _feature_engineer: FeatureEngineer = FeatureEngineer(df=_df, + analytical_data_types=analytical_data_types, + features=_features, + target_feature=_target_feature, + processing_memory=_processing_memory, + feature_engineering_config=feature_engineering_config + ) + _df_engineered = _feature_engineer.re_engineering(features=_features) + _updated_analytical_data_types: Dict[str, List[str]] = _feature_engineer.processing_memory.get('analytical_data_types') + _new_target_feature: str = _target_feature else: if next_level: - _processing_memory: dict = load_file_from_s3(file_path=s3_output_file_path_processor_memory, - encoding='utf-8' - ) - _feature_engineering_config: Dict[str, list] = {} + _processing_memory: dict = load_file_from_s3(file_path=s3_output_file_path_processor_memory) else: _processing_memory: dict = None - if feature_engineering_config is None: - _feature_engineering_config: Dict[str, list] = {} - - else: - _feature_engineering_config: Dict[str, list] = feature_engineering_config - _feature_engineer: FeatureEngineer = FeatureEngineer(df=_df, processing_memory=_processing_memory) - _df_engineered = _feature_engineer.main(feature_engineering_config=_feature_engineering_config) - pd.concat(objs=[_df, _df_engineered], axis=1).to_csv(path_or_buf=s3_output_file_path_data_set, sep=sep, index=False) + _feature_engineering_config: Dict[str, list] = feature_engineering_config + _feature_engineer: FeatureEngineer = FeatureEngineer(df=_df, + analytical_data_types=analytical_data_types, + features=_features, + target_feature=_target_feature, + processing_memory=_processing_memory, + feature_engineering_config=_feature_engineering_config + ) + _df_engineered = _feature_engineer.main() + _updated_analytical_data_types: Dict[str, List[str]] = _feature_engineer.processing_memory.get('analytical_data_types') + _new_target_feature: str = _feature_engineer.processing_memory['new_target_feature'] + pd.concat(objs=[_df, _df_engineered], axis=1).to_csv(path_or_buf=s3_output_file_path_data_set, sep=sep, header=True, index=False) + Log().log(msg=f'Save engineered data set: {s3_output_file_path_data_set}') _feature_names_engineered: List[str] = _df_engineered.columns.tolist() - _predictors: List[str] = _features _predictors.extend(_feature_names_engineered) if exclude is not None: for feature in exclude: @@ -154,36 +164,30 @@ def feature_engineer(data_set_path: str, del _predictors[_predictors.index(non_numeric)] Log().log(msg=f'Exclude original (non-numeric) feature "{non_numeric}"') _predictors = sorted(_predictors) - for file_path, obj, customized_file_path in [(output_file_path_data_set, output_file_path_data_set, output_file_path_data_set_customized), - (output_file_path_target, target_feature_name, output_file_path_target_customized), - (output_file_path_predictors, _predictors, output_file_path_predictors_customized), - (output_file_path_engineered_feature_names, _feature_names_engineered, output_file_path_engineered_feature_names_customized) - ]: + for file_path, obj in [(output_file_path_predictors, _predictors), + (output_file_path_new_target_feature, _new_target_feature), + (output_file_path_analytical_data_types, _updated_analytical_data_types) + ]: file_handler(file_path=file_path, obj=obj) - if customized_file_path is not None: - save_file_to_s3(file_path=customized_file_path, obj=obj) - return [output_file_path_data_set, - output_file_path_processor_memory, - _feature_names_engineered, - _predictors, - target_feature_name - ] + save_file_to_s3(file_path=s3_output_file_path_processor_memory, obj=_feature_engineer.processing_memory) + Log().log(msg=f'Save processing memory: {s3_output_file_path_processor_memory}') + return [_predictors, _new_target_feature, _updated_analytical_data_types] if __name__ == '__main__': feature_engineer(data_set_path=ARGS.data_set_path, analytical_data_types=ARGS.analytical_data_types, - target_feature_name=ARGS.target_feature_name, - output_file_path_data_set=ARGS.output_file_path_data_set, - output_file_path_processor_memory=ARGS.output_file_path_processor_memory, - output_file_path_target=ARGS.output_file_path_target, + target_feature=ARGS.target_feature_name, + s3_output_file_path_data_set=ARGS.s3_output_file_path_data_set, + s3_output_file_path_processor_memory=ARGS.s3_output_file_path_processor_memory, output_file_path_predictors=ARGS.output_file_path_predictors, - output_file_path_engineered_feature_names=ARGS.output_file_path_engineered_feature_names, + output_file_path_new_target_feature=ARGS.output_file_path_new_target_feature, + output_file_path_analytical_data_types=ARGS.output_file_path_analytical_data_types, re_engineering=ARGS.re_engineering, next_level=ARGS.next_level, feature_engineering_config=ARGS.feature_engineering_config, - feature_names=ARGS.feature_names, + features=ARGS.features, exclude=ARGS.exclude, - exclude_original_data=ARGS.exclude_original_data, + exclude_original_data=bool(ARGS.exclude_original_data), sep=ARGS.sep )