From 73f5bf3e398838a591c453bddf81cfdc1650ac0b Mon Sep 17 00:00:00 2001 From: giannibalistreri Date: Thu, 28 Dec 2023 22:43:55 +0100 Subject: [PATCH] Refactoring --- .../task_container/src/feature_engineering.py | 103 +++++++++++++----- 1 file changed, 75 insertions(+), 28 deletions(-) diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/feature_engineering.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/feature_engineering.py index 27629d8..7e59a67 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/feature_engineering.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/feature_engineering/task_container/src/feature_engineering.py @@ -37,6 +37,11 @@ ) ENGINEERING_METH.update({'ordinal': ENGINEERING_METH.get('categorical')}) ENGINEERING_METH['ordinal'].extend(ENGINEERING_METH.get('continuous')) +ENGINEERING_SCALING_PROCESSOR: List[str] = ['min_max_scaler', + 'normalizer', + 'robust_scaler', + 'standard_scaler' + ] ENGINEERING_NUMERIC_INTERACTION_METH: List[str] = ['add', 'divide', 'multiply', @@ -470,7 +475,7 @@ def generate_re_engineering_instructions(self, features: List[str]) -> List[dict for relation in _feature_relations: for level in range(self.level - 1, 0, -1): if self.processing_memory['level'][str(level)].get(relation) is not None: - _instructions.append(self.processing_memory['level'][str(level)][relation]) + _instructions.append({relation: self.processing_memory['level'][str(level)][relation]}) break return _instructions @@ -734,47 +739,89 @@ def power_transform(self, feature_name: str, exponent: int = 2) -> np.ndarray: """ return np.power(self.df[feature_name].values, exponent) - def re_engineering(self, features: List[str]) -> pd.DataFrame: + def re_engineering(self, features: List[str], instructions: List[dict] = None) -> pd.DataFrame: """ Re-engineer features for inference (batch-prediction) :param features: List[str] Names of features to re-engineer + :param instructions: List[dict] + Re-engineering instructions + :return: pd.DataFrame Re-engineered data set """ _df: pd.DataFrame = pd.DataFrame() - for feature in features: - _feature_relations: List[str] = self._get_feature_relations(feature=feature) - for relation in _feature_relations: + if instructions is None: + for feature in features: + _feature_relations: List[str] = self._get_feature_relations(feature=feature) + for relation in _feature_relations: + for level in range(self.level - 1, 0, -1): + if self.processing_memory['level'][str(level)].get(relation) is not None: + if self.processing_memory['level'][str(level)][relation]['meth'] == 'one_hot_encoder': + _df_one_hot: pd.DataFrame = self.one_hot_decoder(feature_name=relation) + _df = pd.concat(objs=[_df, _df_one_hot], axis=1) + elif self.processing_memory['level'][str(level)][relation]['meth'] == 'label_encoder': + _df[relation] = self.re_labeling(feature_name=relation) + elif self.processing_memory['level'][str(level)][relation]['meth'] in ENGINEERING_SCALING_PROCESSOR: + _df[relation] = self.re_scaler(feature_name=relation) + else: + _param: dict = dict(feature_name=self.processing_memory['level'][str(level)][relation]['feature'], + interaction_feature_name=self.processing_memory['level'][str(level)][relation].get('interactor') + ) + _engineering_meth = getattr(self, self.processing_memory['level'][str(level)][relation]['meth'], None) + _df[relation] = _engineering_meth(**_param) + break for level in range(self.level - 1, 0, -1): - if self.processing_memory['level'][str(level)].get(relation) is not None: - if self.processing_memory['level'][str(level)][relation]['meth'] == 'one_hot_encoder': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'one_hot_merger': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'add': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'divide': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'multiply': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'subtract': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'exp_transform': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'log_transform': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'add': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'add': - pass - elif self.processing_memory['level'][str(level)][relation]['meth'] == 'add': - pass + if self.processing_memory['level'][str(level)].get(feature) is not None: + if self.processing_memory['level'][str(level)][feature]['meth'] == 'one_hot_encoder': + _df_one_hot: pd.DataFrame = self.one_hot_decoder(feature_name=feature) + _df = pd.concat(objs=[_df, _df_one_hot], axis=1) + elif self.processing_memory['level'][str(level)][feature]['meth'] == 'label_encoder': + _df[feature] = self.re_labeling(feature_name=feature) + elif self.processing_memory['level'][str(level)][feature]['meth'] in ENGINEERING_SCALING_PROCESSOR: + _df[feature] = self.re_scaler(feature_name=feature) + else: + _param: dict = dict(feature_name=self.processing_memory['level'][str(level)][feature]['feature'], + interaction_feature_name=self.processing_memory['level'][str(level)][feature].get('interactor') + ) + _engineering_meth = getattr(self, self.processing_memory['level'][str(level)][feature]['meth'], None) + _df[feature] = _engineering_meth(**_param) break + else: + for instruction in instructions: + _feature: str = list(instruction.keys())[0] + if instruction[_feature]['meth'] == 'one_hot_encoder': + _df_one_hot: pd.DataFrame = self.one_hot_decoder(feature_name=_feature) + _df = pd.concat(objs=[_df, _df_one_hot], axis=1) + elif instruction[_feature]['meth'] == 'label_encoder': + _df[_feature] = self.re_labeling(feature_name=_feature) + elif instruction[_feature]['meth'] in ENGINEERING_SCALING_PROCESSOR: + _df[_feature] = self.re_scaler(feature_name=_feature) + else: + _param: dict = dict(feature_name=instruction[_feature]['feature'], + interaction_feature_name=instruction[_feature].get('interactor') + ) + _engineering_meth = getattr(self, instruction[_feature]['meth'], None) + _df[_feature] = _engineering_meth(**_param) return _df + def re_labeling(self, feature_name: str) -> np.ndarray: + """ + Re-encoding of category labels based on fitted encoder processor + + :param feature_name: str + Name of the feature + + :return: np.ndarray + Re-encoding categorical feature + """ + _df: pd.DataFrame = pd.DataFrame() + _df[f'{feature_name}_enc'] = self.df[feature_name].values + _df[f'{feature_name}_enc'].replace(to_replace=self.processing_memory['processor'][feature_name], inplace=True) + return _df[f'{feature_name}_enc'].values + def re_scaler(self, feature_name: str) -> np.ndarray: """ Re-scale feature based on fitted scaling processor