Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
giannibalistreri committed Dec 29, 2023
1 parent d252fd8 commit 91fbbda
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self,
df: pd.DataFrame,
analytical_data_types: Dict[str, List[str]],
features: List[str] = None,
target_feature: str = None,
processing_memory: dict = None,
feature_engineering_config: Dict[str, list] = None
):
Expand All @@ -83,13 +84,17 @@ def __init__(self,
:param features: List[str]
Name of the features
:param target_feature: str
Name of the target feature
:param processing_memory: dict
Processing memory
:param feature_engineering_config: Dict[str, list]
Pre-defined configuration
"""
self.df: pd.DataFrame = df
self.target_feature: str = target_feature
self.features: List[str] = self.df.columns.tolist() if features is None else features
if processing_memory is None:
self.processing_memory: dict = dict(level={'0': df.columns.tolist()},
Expand All @@ -98,9 +103,7 @@ def __init__(self,
analytical_data_types=analytical_data_types,
next_level_numeric_features_base=[],
next_level_categorical_features_base=[],
numeric_features=[],
categorical_features=[],
exclude=[]
new_target_feature=target_feature
)
else:
self.processing_memory: dict = processing_memory
Expand Down Expand Up @@ -223,12 +226,23 @@ def _process_memory(self, meth: str, param: dict, feature: str, interactor: str,
"""
Process memory
:param meth:
:param param:
:param feature:
:param interactor:
:param new_feature:
:param categorical:
:param meth: str
Name of the feature engineering (class method)
:param param: dict
Parameter setting
:param feature: str
Name of the feature
:param interactor: str
Name of the interactor feature
:param new_feature: str
Name of the new generated feature
:param categorical: bool
Whether new generated feature is categorical or numeric
"""
self.processing_memory['level'][str(self.level)].update({new_feature: dict(meth=meth,
param=param,
Expand All @@ -252,10 +266,16 @@ def _process_memory(self, meth: str, param: dict, feature: str, interactor: str,
self.processing_memory['feature_relations'][new_feature].append(interactor)
if categorical:
self.processing_memory['analytical_data_types']['categorical'].append(new_feature)
self.processing_memory['next_level_categorical_features_base'].append(new_feature)
if feature == self.target_feature:
self.processing_memory['new_target_feature'] = new_feature
else:
self.processing_memory['next_level_categorical_features_base'].append(new_feature)
else:
self.processing_memory['analytical_data_types']['continuous'].append(new_feature)
self.processing_memory['next_level_numeric_features_base'].append(new_feature)
if feature == self.target_feature:
self.processing_memory['new_target_feature'] = new_feature
else:
self.processing_memory['next_level_numeric_features_base'].append(new_feature)

def add(self, feature_name: str, interaction_feature_name: str) -> np.ndarray:
"""
Expand Down Expand Up @@ -523,7 +543,7 @@ def log_transform(self, feature_name: str) -> np.ndarray:
"""
return np.log(self.df[feature_name].values)

def main(self, feature_engineering_config: Dict[str, list]) -> pd.DataFrame:
def main(self) -> pd.DataFrame:
"""
Apply feature engineering using (tabular) structured data
Expand All @@ -532,9 +552,9 @@ def main(self, feature_engineering_config: Dict[str, list]) -> pd.DataFrame:
"""
self.processing_memory['level'].update({str(self.level): {}})
_df: pd.DataFrame = pd.DataFrame()
for meth in feature_engineering_config.keys():
for meth in self.feature_engineering_config.keys():
_engineering_meth = getattr(self, meth, None)
for element in feature_engineering_config[meth]:
for element in self.feature_engineering_config[meth]:
if isinstance(element, str):
_param: dict = dict(feature_name=element)
elif isinstance(element, tuple):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,50 @@
"""

import argparse
import boto3
import pandas as pd

from aws import load_file_from_s3, save_file_to_s3
from custom_logger import Log
from feature_engineering import ENGINEERING_METH, MIN_FEATURES_BY_METH, FeatureEngineer
from feature_engineering import FeatureEngineer
from file_handler import file_handler
from typing import Any, Dict, NamedTuple, List

PARSER = argparse.ArgumentParser(description="feature engineering")
PARSER.add_argument('-data_set_path', type=str, required=True, default=None, help='file path of the data set')
PARSER.add_argument('-analytical_data_types', type=Any, required=True, default=None, help='assignment of features to analytical data types')
PARSER.add_argument('-target_feature', type=str, required=True, default=None, help='name of the target feature')
PARSER.add_argument('-output_bucket_name', type=str, required=True, default=None, help='name of the S3 output bucket')
PARSER.add_argument('-output_file_path_data_set', type=str, required=True, default=None, help='file path of the data set')
PARSER.add_argument('-output_file_path_processor_memory', type=str, required=True, default=None, help='file path of output processing memory')
PARSER.add_argument('-output_file_path_target', type=str, required=True, default=None, help='file path of the output target feature')
PARSER.add_argument('-output_file_path_predictors', type=str, required=True, default=None, help='file path of the output predictors')
PARSER.add_argument('-output_file_path_engineered_feature_names', type=str, required=True, default=None, help='file path of the output processed features')
PARSER.add_argument('-re_engineering', type=bool, required=False, default=False, help='whether to re-engineer features for inference or not')
PARSER.add_argument('-next_level', type=bool, required=False, default=False, help='whether to generate deeper engineered features or not')
PARSER.add_argument('-re_engineering', type=int, required=False, default=False, help='whether to re-engineer features for inference or not')
PARSER.add_argument('-next_level', type=int, required=False, default=False, help='whether to generate deeper engineered features or not')
PARSER.add_argument('-feature_engineering_config', type=Any, required=False, default=None, help='feature engineering pre-defined config file')
PARSER.add_argument('-feature_names', type=str, required=False, default=None, help='pre-defined feature names used for feature engineering')
PARSER.add_argument('-features', type=str, required=False, default=None, help='feature names used for feature engineering')
PARSER.add_argument('-exclude', type=str, required=False, default=None, help='pre-defined feature names to exclude')
PARSER.add_argument('-exclude_original_data', type=str, required=False, default=None, help='exclude all original features (especially numeric features)')
PARSER.add_argument('-exclude_original_data', type=int, required=False, default=0, help='whether to exclude all original features (especially numeric features) or not')
PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator')
PARSER.add_argument('-output_file_path_predictors', type=str, required=True, default=None, help='file path of the predictors output')
PARSER.add_argument('-output_file_path_new_target_feature', type=str, required=True, default=None, help='file path of the new target feature output')
PARSER.add_argument('-output_file_path_analytical_data_types', type=str, required=True, default=None, help='file path of the analytical data types output')
PARSER.add_argument('-s3_output_file_path_data_set', type=str, required=True, default=None, help='S3 file path of the engineered data set')
PARSER.add_argument('-s3_output_file_path_processor_memory', type=str, required=True, default=None, help='S3 file path of the processor memory')
ARGS = PARSER.parse_args()


def feature_engineer(data_set_path: str,
analytical_data_types: dict,
target_feature_name: str,
analytical_data_types: Dict[str, List[str]],
target_feature: str,
s3_output_file_path_data_set: str,
s3_output_file_path_processor_memory: str,
output_file_path_predictors: str,
output_file_path_new_target_feature: str,
output_file_path_analytical_data_types: str,
re_engineering: bool = False,
next_level: bool = False,
feature_engineering_config: Dict[str, list] = None,
feature_names: List[str] = None,
features: List[str] = None,
exclude: List[str] = None,
exclude_original_data: bool = False,
sep: str = ','
) -> NamedTuple('outputs', [('predictors', list),
('new_target_feature', str),
('analytical_data_types', dict)
]
):
Expand All @@ -61,23 +61,23 @@ def feature_engineer(data_set_path: str,
:param analytical_data_types: dict
Assigned analytical data types to each feature
:param target_feature_name: str
:param target_feature: str
Name of the target feature
:param output_file_path_data_set: str
Path of the data set to save
:param s3_output_file_path_data_set: str
Complete file path of the data set to save
:param output_file_path_processor_memory: str
Path of the processing memory to save
:param output_file_path_target: str
Path of the target feature to save
:param s3_output_file_path_processor_memory: str
Complete file path of the processing memory to save
:param output_file_path_predictors: str
Path of the predictors to save
Path of the predictors output
:param output_file_path_new_target_feature: str
Path of the new target feature output
:param output_file_path_engineered_feature_names: str
Path of the engineered feature names
:param output_file_path_analytical_data_types: str
Path of the updated analytical data types output
:param re_engineering: bool
Whether to re-engineer features for inference or to engineer for training
Expand All @@ -88,7 +88,7 @@ def feature_engineer(data_set_path: str,
:param feature_engineering_config: Dict[str, list]
Pre-defined configuration
:param feature_names: List[str]
:param features: List[str]
Name of the features
:param exclude: List[str]
Expand All @@ -104,33 +104,43 @@ def feature_engineer(data_set_path: str,
Path of the engineered data set
"""
_df: pd.DataFrame = pd.read_csv(filepath_or_buffer=data_set_path, sep=sep)
_features: List[str] = _df.columns.tolist() if feature_names is None else feature_names
_features: List[str] = _df.columns.tolist() if features is None else features
_target_feature: str = target_feature
if _target_feature in _features:
del _features[_features.index(_target_feature)]
_predictors: List[str] = _features
if re_engineering:
_predictors: List[str] = _features
_feature_names_engineered: List[str] = None
_processing_memory: dict = load_file_from_s3(file_path=s3_output_file_path_processor_memory,
encoding='utf-8'
)
_feature_engineer: FeatureEngineer = FeatureEngineer(df=_df, processing_memory=_processing_memory)
_df_engineered = _feature_engineer.re_engineering(features=_predictors)
_processing_memory: dict = load_file_from_s3(file_path=s3_output_file_path_processor_memory)
_feature_engineer: FeatureEngineer = FeatureEngineer(df=_df,
analytical_data_types=analytical_data_types,
features=_features,
target_feature=_target_feature,
processing_memory=_processing_memory,
feature_engineering_config=feature_engineering_config
)
_df_engineered = _feature_engineer.re_engineering(features=_features)
_updated_analytical_data_types: Dict[str, List[str]] = _feature_engineer.processing_memory.get('analytical_data_types')
_new_target_feature: str = _target_feature
else:
if next_level:
_processing_memory: dict = load_file_from_s3(file_path=s3_output_file_path_processor_memory,
encoding='utf-8'
)
_feature_engineering_config: Dict[str, list] = {}
_processing_memory: dict = load_file_from_s3(file_path=s3_output_file_path_processor_memory)
else:
_processing_memory: dict = None
if feature_engineering_config is None:
_feature_engineering_config: Dict[str, list] = {}

else:
_feature_engineering_config: Dict[str, list] = feature_engineering_config
_feature_engineer: FeatureEngineer = FeatureEngineer(df=_df, processing_memory=_processing_memory)
_df_engineered = _feature_engineer.main(feature_engineering_config=_feature_engineering_config)
pd.concat(objs=[_df, _df_engineered], axis=1).to_csv(path_or_buf=s3_output_file_path_data_set, sep=sep, index=False)
_feature_engineering_config: Dict[str, list] = feature_engineering_config
_feature_engineer: FeatureEngineer = FeatureEngineer(df=_df,
analytical_data_types=analytical_data_types,
features=_features,
target_feature=_target_feature,
processing_memory=_processing_memory,
feature_engineering_config=_feature_engineering_config
)
_df_engineered = _feature_engineer.main()
_updated_analytical_data_types: Dict[str, List[str]] = _feature_engineer.processing_memory.get('analytical_data_types')
_new_target_feature: str = _feature_engineer.processing_memory['new_target_feature']
pd.concat(objs=[_df, _df_engineered], axis=1).to_csv(path_or_buf=s3_output_file_path_data_set, sep=sep, header=True, index=False)
Log().log(msg=f'Save engineered data set: {s3_output_file_path_data_set}')
_feature_names_engineered: List[str] = _df_engineered.columns.tolist()
_predictors: List[str] = _features
_predictors.extend(_feature_names_engineered)
if exclude is not None:
for feature in exclude:
Expand All @@ -154,36 +164,30 @@ def feature_engineer(data_set_path: str,
del _predictors[_predictors.index(non_numeric)]
Log().log(msg=f'Exclude original (non-numeric) feature "{non_numeric}"')
_predictors = sorted(_predictors)
for file_path, obj, customized_file_path in [(output_file_path_data_set, output_file_path_data_set, output_file_path_data_set_customized),
(output_file_path_target, target_feature_name, output_file_path_target_customized),
(output_file_path_predictors, _predictors, output_file_path_predictors_customized),
(output_file_path_engineered_feature_names, _feature_names_engineered, output_file_path_engineered_feature_names_customized)
]:
for file_path, obj in [(output_file_path_predictors, _predictors),
(output_file_path_new_target_feature, _new_target_feature),
(output_file_path_analytical_data_types, _updated_analytical_data_types)
]:
file_handler(file_path=file_path, obj=obj)
if customized_file_path is not None:
save_file_to_s3(file_path=customized_file_path, obj=obj)
return [output_file_path_data_set,
output_file_path_processor_memory,
_feature_names_engineered,
_predictors,
target_feature_name
]
save_file_to_s3(file_path=s3_output_file_path_processor_memory, obj=_feature_engineer.processing_memory)
Log().log(msg=f'Save processing memory: {s3_output_file_path_processor_memory}')
return [_predictors, _new_target_feature, _updated_analytical_data_types]


if __name__ == '__main__':
feature_engineer(data_set_path=ARGS.data_set_path,
analytical_data_types=ARGS.analytical_data_types,
target_feature_name=ARGS.target_feature_name,
output_file_path_data_set=ARGS.output_file_path_data_set,
output_file_path_processor_memory=ARGS.output_file_path_processor_memory,
output_file_path_target=ARGS.output_file_path_target,
target_feature=ARGS.target_feature_name,
s3_output_file_path_data_set=ARGS.s3_output_file_path_data_set,
s3_output_file_path_processor_memory=ARGS.s3_output_file_path_processor_memory,
output_file_path_predictors=ARGS.output_file_path_predictors,
output_file_path_engineered_feature_names=ARGS.output_file_path_engineered_feature_names,
output_file_path_new_target_feature=ARGS.output_file_path_new_target_feature,
output_file_path_analytical_data_types=ARGS.output_file_path_analytical_data_types,
re_engineering=ARGS.re_engineering,
next_level=ARGS.next_level,
feature_engineering_config=ARGS.feature_engineering_config,
feature_names=ARGS.feature_names,
features=ARGS.features,
exclude=ARGS.exclude,
exclude_original_data=ARGS.exclude_original_data,
exclude_original_data=bool(ARGS.exclude_original_data),
sep=ARGS.sep
)

0 comments on commit 91fbbda

Please sign in to comment.