From 35aecc31a2a944b4326a729601295592eaeaf583 Mon Sep 17 00:00:00 2001 From: giannibalistreri Date: Tue, 26 Dec 2023 22:45:17 +0100 Subject: [PATCH] Refactoring --- .../task_container/src/data_health_check.py | 2 +- .../task_container/src/task.py | 4 +-- .../task_container/src/imputation.py | 33 ++++++++++++++++--- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/data_health_check.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/data_health_check.py index a9d6a9f..ea34d91 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/data_health_check.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/data_health_check.py @@ -43,7 +43,7 @@ def _convert_invalid_to_nan(self) -> None: """ for invalid in INVALID_VALUES: self.df.replace(to_replace=invalid, value=np.nan, inplace=True) - Log().log(msg='Converted invalid values to missing values (NaN)') + Log().log(msg=f'Converted invalid values ({invalid}) to missing values (NaN)') def _is_duplicated(self, feature_name: str) -> bool: """ diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/task.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/task.py index 7714145..3d6a4b9 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/task.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/data_health_check/task_container/src/task.py @@ -17,13 +17,13 @@ PARSER = argparse.ArgumentParser(description="check data health") PARSER.add_argument('-data_set_path', type=str, required=True, default=None, help='file path of the data set') PARSER.add_argument('-analytical_data_types', type=Any, required=True, default=None, help='assignment of features to analytical data types') +PARSER.add_argument('-missing_value_threshold', type=float, required=False, default=0.95, help='threshold to classify features as invalid based on the amount of missing values') +PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator') PARSER.add_argument('-output_file_path_data_missing_data', type=str, required=True, default=None, help='file path of features containing too much missing data') PARSER.add_argument('-output_file_path_invariant', type=str, required=True, default=None, help='file path of invariant features') PARSER.add_argument('-output_file_path_duplicated', type=str, required=True, default=None, help='file path of duplicated features') PARSER.add_argument('-output_file_path_valid_features', type=str, required=True, default=None, help='file path of valid features') PARSER.add_argument('-output_file_path_prop_valid_features', type=str, required=True, default=None, help='file path of the proportion of valid features') -PARSER.add_argument('-missing_value_threshold', type=float, required=False, default=0.95, help='threshold to classify features as invalid based on the amount of missing values') -PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator') PARSER.add_argument('-s3_output_file_path_data_health_check', type=str, required=False, default=None, help='S3 file path of the data health check output') ARGS = PARSER.parse_args() diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/imputation/task_container/src/imputation.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/imputation/task_container/src/imputation.py index 1b51f5e..fddb180 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/imputation/task_container/src/imputation.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/imputation/task_container/src/imputation.py @@ -9,7 +9,7 @@ import pandas as pd from custom_logger import Log -from typing import List, Tuple, Union +from typing import Dict, List, Tuple, Union class ImputationException(Exception): @@ -30,11 +30,28 @@ def __init__(self, df: pd.DataFrame): """ self.imp_config: dict = {} self.df: pd.DataFrame = df + self.missing_value_count: int = 0 + self.missing_value_count_by_feature: Dict[str, int] = {} + self._check_missing_values() - def _mice(self): - pass + def _check_missing_values(self) -> None: + """ + Check which feature contains missing values + + :return: + """ + _missing_value_count_df: pd.Series = pd.isnull(self.df[self.df.columns.tolist()]).astype(int).sum() + self.missing_value_count = _missing_value_count_df.sum() + self.missing_value_count_by_feature = _missing_value_count_df.to_dict() + Log().log(msg=f'Total number of missing values in data set: {self.missing_value_count}') + + def _mice(self) -> None: + """ + Multiple imputation by chained equation for impute missing at random (MAR) patterns using predictions of machine learning models + """ + raise ImputationException('MICE algorithm not supported') - def _random(self, feature: str, m: int = 3, convergence_threshold: float = 0.99): + def _random(self, feature: str, m: int = 3, convergence_threshold: float = 0.99) -> np.ndarray: """ Multiple imputation using randomly generated values within range of observations @@ -135,6 +152,10 @@ def main(self, """ _imp_df: pd.DataFrame = pd.DataFrame() for feature in feature_names: + if self.missing_value_count_by_feature.get(feature) == 0: + _imp_df[feature] = self.df[feature] + Log().log(msg=f'No missing values of feature {feature} detected') + continue self.imp_config.update({feature: dict(imp_meth_type=imp_meth)}) if imp_meth == 'single': self.imp_config[feature].update({'imp_meth': single_meth}) @@ -160,8 +181,9 @@ def main(self, else: raise ImputationException(f'Single imputation method ({single_meth}) not supported') _std_diff: float = 1 - round(_imp_df[f'{feature}_imp'].std() / self.df[feature].std()) - _msg_element: str = 'in' if _std_diff > 0 else 'de' + _msg_element: str = 'in' if _std_diff >= 0 else 'de' Log().log(msg=f'Variance of feature ({feature}) {_msg_element}creases by {_std_diff}%') + Log().log(msg=f'Missing values of feature {feature} imputed by applying single imputation method ({single_meth})') elif imp_meth == 'multiple': self.imp_config[feature].update({'imp_meth': multiple_meth, 'imp_value': []}) if multiple_meth == 'mice': @@ -170,6 +192,7 @@ def main(self, _imp_df[f'{feature}_imp'] = self._random(feature=feature, m=m, convergence_threshold=convergence_threshold) else: raise ImputationException(f'Multiple imputation method ({multiple_meth}) not supported') + Log().log(msg=f'Missing values of feature {feature} imputed by applying multiple imputation method ({multiple_meth})') else: raise ImputationException(f'Imputation method ({imp_meth}) not supported') return _imp_df