diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/sampling/task_container/src/sampler.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/sampling/task_container/src/sampler.py index 995ca0d..16871e0 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/sampling/task_container/src/sampler.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/sampling/task_container/src/sampler.py @@ -150,6 +150,19 @@ def k_fold_cross_validation(self, k: int = 5) -> dict: _counter += 1 return _kfold_sample + def time_series_sampling(self) -> dict: + """ + Timeseries data sampling into train & test data + + :return: dict: + Train and test split for both target and predictors + """ + if self.time_series_feature is None or self.time_series_feature not in self.df.columns: + raise MLSamplerException(f'Time series feature ({self.time_series_feature}) not found in data set') + self.df.sort_values(by=self.time_series_feature, axis=1, ascending=True, inplace=True) + self.random_sample = False + return self.train_test_sampling() + def train_test_sampling(self) -> dict: """ Data sampling into train & test data @@ -194,19 +207,6 @@ def train_test_sampling(self) -> dict: y_val=_y_val ) - def time_series_sampling(self) -> dict: - """ - Timeseries data sampling into train & test data - - :return: dict: - Train and test split for both target and predictors - """ - if self.time_series_feature is None or self.time_series_feature not in self.df.columns: - raise MLSamplerException(f'Time series feature ({self.time_series_feature}) not found in data set') - self.df.sort_values(by=self.time_series_feature, axis=1, ascending=True, inplace=True) - self.random_sample = False - return self.train_test_sampling() - def up_sampling(self, target_class_value: Union[str, int], target_proportion: float) -> pd.DataFrame: """ Up sample specific ranges of target values @@ -243,7 +243,7 @@ class Sampler: """ Class for general sampling purposes """ - def __init__(self, df, size: int = None, prop: float = None, **kwargs): + def __init__(self, df, size: int = None, prop: float = None): """ :param df: Pandas DataFrame Data set @@ -253,9 +253,6 @@ def __init__(self, df, size: int = None, prop: float = None, **kwargs): :param prop: float Sample proportion - - :param kwargs: dict - Key-word arguments for handling dask parameter settings """ self.df: pd.DataFrame = df if size is None: diff --git a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/sampling/task_container/src/task.py b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/sampling/task_container/src/task.py index 84e654e..0a385f2 100644 --- a/kubeflow_templates/kubeflow_pipelines/task_pool/v1/sampling/task_container/src/task.py +++ b/kubeflow_templates/kubeflow_pipelines/task_pool/v1/sampling/task_container/src/task.py @@ -1,47 +1,113 @@ """ + Task: ... (Function to run in container) + """ -import boto3 -import json -import os +import argparse import pandas as pd -from sampler import MLSampler -from typing import NamedTuple, List - - -def train_test_split(data_set_file_path: str, - target_feature_name: str, - output_file_path_train_test_split_data: str, - output_file_path_sampling_metadata: str, - output_bucket_name: str = None, - features: List[str] = None, - train_size: float = 0.8, - validation_size: float = 0.1, - random_sample: bool = True, - sep: str = ',', - seed: int = 1234 - ) -> NamedTuple('outputs', [('train_data_set_path', str), - ('test_data_set_path', str), - ('val_data_set_path', str), - ('metadata', dict) - ]): +from aws import save_file_to_s3 +from custom_logger import Log +from file_handler import file_handler +from sampler import MLSampler, Sampler +from typing import Any, Dict, List, NamedTuple + +SAMPLING_METH: List[str] = ['quota', 'random'] +ML_CLF_SAMPLING_METH: List[str] = ['down', 'up'] +ML_SAMPLING_METH: List[str] = ['train_test', 'train_test_time_series'] + +PARSER = argparse.ArgumentParser(description="data sampling") +PARSER.add_argument('-action', type=str, required=True, default=None, help='sampling action') +PARSER.add_argument('-data_set_file_path', type=str, required=True, default=None, help='complete file path to the data set') +PARSER.add_argument('-target_feature', type=str, required=True, default=None, help='name of the target feature') +PARSER.add_argument('-features', type=list, required=False, default=None, help='names of the features') +PARSER.add_argument('-time_series_feature', type=str, required=False, default=None, help='name of the time series feature') +PARSER.add_argument('-train_size', type=float, required=False, default=0.8, help='size of the training data set') +PARSER.add_argument('-validation_size', type=float, required=False, default=0.1, help='size of the validation data set') +PARSER.add_argument('-random_sample', type=int, required=False, default=1, help='whether to sample randomly or not') +PARSER.add_argument('-target_class_value', type=int, required=False, default=None, help='target class value to sample') +PARSER.add_argument('-target_proportion', type=float, required=False, default=None, help='target proportion of class value') +PARSER.add_argument('-size', type=int, required=False, default=None, help='size of the sampled data set') +PARSER.add_argument('-prop', type=float, required=False, default=None, help='proportion of the sampled data set') +PARSER.add_argument('-quotas', type=Any, required=False, default=None, help='pre-defined quota configuration for sampling') +PARSER.add_argument('-sep', type=str, required=False, default=',', help='column separator') +PARSER.add_argument('-output_file_path_sampling_metadata', type=str, required=True, default=None, help='complete file path of the metadata output') +PARSER.add_argument('-s3_output_file_path_train_data_set', type=str, required=False, default=None, help='complete file path of the training data set output') +PARSER.add_argument('-s3_output_file_path_test_data_set', type=str, required=False, default=None, help='complete file path of the test data set output') +PARSER.add_argument('-s3_output_file_path_val_data_set', type=str, required=False, default=None, help='complete file path of the validation data set output') +PARSER.add_argument('-s3_output_file_path_sampling_data_set', type=str, required=False, default=None, help='S3 file path of the sampled data set output') +PARSER.add_argument('-s3_output_file_path_sampling_metadata', type=str, required=False, default=None, help='S3 file path of the sampling metadata output') +ARGS = PARSER.parse_args() + + +class SamplingException(Exception): + """ + Class for handling exceptions for function sampling + """ + pass + + +def sampling(action: str, + data_set_file_path: str, + target_feature: str, + output_file_path_sampling_metadata: str = None, + s3_output_file_path_train_data_set: str = None, + s3_output_file_path_test_data_set: str = None, + s3_output_file_path_val_data_set: str = None, + s3_output_file_path_sampling_data_set: str = None, + features: List[str] = None, + time_series_feature: str = None, + train_size: float = 0.8, + validation_size: float = 0.1, + random_sample: bool = True, + target_class_value: int = None, + target_proportion: float = None, + size: int = None, + prop: float = None, + quotas: Dict[str, Dict[str, float]] = None, + sep: str = ',', + s3_output_file_path_sampling_metadata: str = None, + ) -> NamedTuple('outputs', [('sampling_metadata', dict)]): """ Sampling data sets for training, testing and validation used for applying supervised machine learning models + :param action: str + Name of the sampling action + -> random: Random sampling + -> quota: Quota based sampling + -> down: Down-sampling of class value + -> up: Up-sampling of class value + -> train_test: Train-test sampling for structured data + -> train_test_time_series: Train-test sampling for time series data + :param data_set_file_path: str Complete file path of the data set - :param output_path: str - Path of the sample data sets - - :param target_feature_name: str + :param target_feature: str Name of the target feature + :param output_file_path_sampling_metadata: str + Complete file path of the sampling metadata output + + :param s3_output_file_path_train_data_set: str + Complete file path of the sampled training data set + + :param s3_output_file_path_test_data_set: str + Complete file path of the sampled test data set + + :param s3_output_file_path_val_data_set: str + Complete file path of the sampled validation data set + + :param s3_output_file_path_sampling_data_set: str + Complete file path of the sampled data set + :param features: List[str] Name of features to use + :param time_series_feature: str + Name of the datetime feature to use + :param train_size: float Size of the training data set @@ -51,59 +117,113 @@ def train_test_split(data_set_file_path: str, :param random_sample: bool Whether to sample randomly or not + :param target_class_value: Union[str, int] + Class value of the target feature to sample + + :param target_proportion: float + Target proportion of the class value of the target feature + + :param size: int + Sample size + + :param prop: float + Proportion of the sample size + + :param quotas: Dict[str, Dict[str, float]] + Pre-defined quota config used for quota sampling + :param sep: str Separator - :param seed: int - Seed value + :param s3_output_file_path_sampling_metadata: str + Complete file path of the sampling metadata :return: NamedTuple Path of the sampled data sets and metadata about each data set """ _df: pd.DataFrame = pd.read_csv(filepath_or_buffer=data_set_file_path, sep=sep) - _ml_sampler: MLSampler = MLSampler(df=_df, - target=target_feature_name, - features=features, - train_size=train_size, - random_sample=random_sample, - stratification=False, - seed=seed - ) - _train_test_split: dict = _ml_sampler.train_test_sampling(validation_split=validation_size) - _train_df: pd.DataFrame = _train_test_split.get('x_train') - _train_df[target_feature_name] = _train_test_split.get('y_train') - _train_data_set_path: str = os.path.join(output_file_path_train_test_split_data, 'train.csv') - _train_df.to_csv(path_or_buf=_train_data_set_path, sep=sep, header=True, index=False) - _test_df: pd.DataFrame = _train_test_split.get('x_test') - _test_df[target_feature_name] = _train_test_split.get('y_test') - _test_data_set_path: str = os.path.join(output_file_path_train_test_split_data, 'test.csv') - _test_df.to_csv(path_or_buf=_test_data_set_path, sep=sep, header=True, index=False) - _sampling_metadata: dict = dict(n_features=_train_df.shape[1] - 1, - n_cases={'train': _train_df.shape[0], - 'test': _test_df.shape[0], - } - ) - if _train_test_split.get('x_val') is not None and _train_test_split.get('y_val') is not None: - _val_df: pd.DataFrame = _train_test_split.get('x_val') - _val_df[target_feature_name] = _train_test_split.get('y_val') - _val_data_set_path: str = os.path.join(output_file_path_train_test_split_data, 'val.csv') - _val_df.to_csv(path_or_buf=_val_data_set_path, sep=sep, header=True, index=False) - _sampling_metadata['n_cases'].update({'val': _val_df.shape[0]}) + if action in SAMPLING_METH or action in ML_CLF_SAMPLING_METH: + if action in SAMPLING_METH: + _sampler: Sampler = Sampler(df=_df, size=size, prop=prop) + else: + _sampler: MLSampler = MLSampler(df=_df, + target=target_feature, + features=features, + time_series_feature=time_series_feature, + train_size=train_size, + validation_size=validation_size, + random_sample=random_sample, + stratification=False + ) + if action == 'quota': + _sampled_df: pd.DataFrame = _sampler.quota(features=features, quotas=quotas) + elif action == 'random': + _sampled_df: pd.DataFrame = _sampler.random() + elif action == 'down': + _sampled_df = _sampler.down_sampling(target_class_value=target_class_value, target_proportion=target_proportion) + else: + _sampled_df = _sampler.up_sampling(target_class_value=target_class_value, target_proportion=target_proportion) + _sampling_metadata: dict = dict(n_features=_sampled_df.shape[1], n_cases={action: _sampled_df.shape[0]}) + _sampled_df.to_csv(path_or_buf=s3_output_file_path_sampling_data_set, header=True, index=False, sep=sep) + Log().log(msg=f'Save {action} sampled data set: {s3_output_file_path_sampling_data_set}') + elif action in ML_SAMPLING_METH: + _ml_sampler: MLSampler = MLSampler(df=_df, + target=target_feature, + features=features, + time_series_feature=time_series_feature, + train_size=train_size, + validation_size=validation_size, + random_sample=random_sample, + stratification=False + ) + if action == 'train_test': + _train_test_split: dict = _ml_sampler.train_test_sampling() + else: + _train_test_split: dict = _ml_sampler.time_series_sampling() + _train_df: pd.DataFrame = _train_test_split.get('x_train') + _train_df[target_feature] = _train_test_split.get('y_train') + _train_df.to_csv(path_or_buf=s3_output_file_path_train_data_set, sep=sep, header=True, index=False) + Log().log(msg=f'Save training data set: {s3_output_file_path_train_data_set}') + _test_df: pd.DataFrame = _train_test_split.get('x_test') + _test_df[target_feature] = _train_test_split.get('y_test') + _test_df.to_csv(path_or_buf=s3_output_file_path_test_data_set, sep=sep, header=True, index=False) + Log().log(msg=f'Save test data set: {s3_output_file_path_test_data_set}') + _sampling_metadata: dict = dict(n_features=_train_df.shape[1] - 1, + n_cases={'train': _train_df.shape[0], 'test': _test_df.shape[0]} + ) + if _train_test_split.get('x_val') is not None and _train_test_split.get('y_val') is not None: + _val_df: pd.DataFrame = _train_test_split.get('x_val') + _val_df[target_feature] = _train_test_split.get('y_val') + _val_df.to_csv(path_or_buf=s3_output_file_path_val_data_set, sep=sep, header=True, index=False) + Log().log(msg=f'Save validation data set: {s3_output_file_path_val_data_set}') + _sampling_metadata['n_cases'].update({'val': _val_df.shape[0]}) else: - _val_data_set_path: str = None - for file_path, obj in [(_train_data_set_path, _train_data_set_path), - (_test_data_set_path, _test_data_set_path), - (_val_data_set_path, _val_data_set_path), - (output_file_path_sampling_metadata, _sampling_metadata) - ]: - with open(file_path, 'w') as _file: - json.dump(obj, _file) - if output_bucket_name is not None: - _s3_resource: boto3 = boto3.resource('s3') - _s3_obj: _s3_resource.Object = _s3_resource.Object(output_bucket_name, output_file_path_sampling_metadata) - _s3_obj.put(Body=json.dumps(obj=_sampling_metadata)) - return [_train_data_set_path, - _test_data_set_path, - _val_data_set_path, - _sampling_metadata - ] + raise SamplingException(f'Sampling action ({action}) not supported') + file_handler(file_path=output_file_path_sampling_metadata, obj=_sampling_metadata) + if s3_output_file_path_sampling_metadata is not None: + save_file_to_s3(file_path=s3_output_file_path_sampling_metadata, obj=_sampling_metadata) + return [_sampling_metadata] + + +if __name__ == '__main__': + sampling(action=ARGS.action, + data_set_file_path=ARGS.data_set_file_path, + target_feature=ARGS.target_feature, + output_file_path_sampling_metadata=ARGS.output_file_path_sampling_metadata, + s3_output_file_path_train_data_set=ARGS.s3_output_file_path_train_data_set, + s3_output_file_path_test_data_set=ARGS.s3_output_file_path_test_data_set, + s3_output_file_path_val_data_set=ARGS.s3_output_file_path_val_data_set, + s3_output_file_path_sampling_data_set=ARGS.s3_output_file_path_sampling_data_set, + features=ARGS.features, + time_series_feature=ARGS.time_series_feature, + train_size=ARGS.train_size, + validation_size=ARGS.validation_size, + random_sample=bool(ARGS.random_sample), + target_class_value=ARGS.target_class_value, + target_proportion=ARGS.target_proportion, + size=ARGS.size, + prop=ARGS.prop, + quotas=ARGS.quotas, + sep=ARGS.sep, + s3_output_file_path_sampling_metadata=ARGS.s3_output_file_path_sampling_metadata + )