From ffb2e9d49726fa063fcb69829eab06144d3a6112 Mon Sep 17 00:00:00 2001 From: Rabah Abdul Khalek Date: Fri, 21 Oct 2022 15:48:53 +0200 Subject: [PATCH] migrated saving and compressing utilities from old PR-106 --- giskard/ml_worker/testing/utils.py | 94 ++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/giskard/ml_worker/testing/utils.py b/giskard/ml_worker/testing/utils.py index 56eff71..087aac5 100644 --- a/giskard/ml_worker/testing/utils.py +++ b/giskard/ml_worker/testing/utils.py @@ -3,6 +3,16 @@ from giskard.ml_worker.generated.ml_worker_pb2 import SingleTestResult from giskard.ml_worker.utils.logging import timer +import os +from zstandard import ZstdCompressor, ZstdDecompressor +from typing import Optional +from io import BytesIO +import re +import pandas as pd +import numpy as np + +COMPRESSION_MAX_OUTPUT_SIZE = 10 ** 9 # 1GB + def ge_result_to_test_result(result, passed=True) -> SingleTestResult: """ @@ -41,3 +51,87 @@ def apply_perturbation_inplace(df: pd.DataFrame, perturbation_dict): df.loc[idx, pert_col] = new_value i += 1 return modified_rows + +def save_df(df: pd.DataFrame, format: str = "csv") -> bytes: + pandas_version: int = int(re.sub("\D", "", pd.__version__)) + if format == "csv": + csv_buffer = BytesIO() + if pandas_version >= 120: + df.to_csv(csv_buffer, index=False) + else: + csv_buffer.write(df.to_csv(index=False).encode("utf-8")) + csv_buffer.seek(0) + return csv_buffer.getvalue() + else: + raise ValueError("Invalid method: {method}. Choose 'csv'.") + + +def compress(data: bytes, method: Optional[str] = "zstd") -> bytes: + if method == "zstd": + compressor = ZstdCompressor(level=3, write_checksum=True) + compressed_data = compressor.compress(data) + elif method is None: + compressed_data = data + else: + raise ValueError("Invalid compression method: {method}. Choose 'zstd' or None.") + + return compressed_data + + +def decompress( + data: bytes, method: Optional[str] = "zstd", max_output_size: int = COMPRESSION_MAX_OUTPUT_SIZE +) -> bytes: + if method == "zstd": + decompressor = ZstdDecompressor() + decompressed_data = decompressor.decompress(data, max_output_size=max_output_size) + elif method is None: + decompressed_data = data + else: + raise ValueError("Invalid compression method: {method}. Choose 'zstd' or None.") + return decompressed_data + + +#TODO: Verify this function -- https://giskard.youtrack.cloud/issue/GSK-189/Numerical-Binning-for-drift-tests-not-working-properly +def bin_numerical_values(data_series, labels=None, bins=None): + """ + Bin numerical values into quantile partitions to convert numerical value as categorical value to compute drifts + """ + if labels is not None and bins is not None: + # Convert actual dataset based on bins of reference dataset + + if data_series.min() < bins[0]: + # If actual dataset set has value smaller than the minimum bin of reference, + # it is added in the bin with label -1 + print(f"Miniumum value {data_series.min()} of Actual Dataset has been appended in the bin") + bins = np.insert(bins, 0, data_series.min()) + labels = np.insert(labels, 0, -1) + + if data_series.max() > bins[-1]: + # If actual dataset set has value larger than the maximum bin of reference, + # it is added in the bin with label 10 + print(f"Maximum value {data_series.max()} of Actual Dataset has been appended in the bin") + bins = np.append(bins, data_series.max()) + labels = np.append(labels, 10) + + converted_series = pd.cut(data_series, bins=bins, labels=labels, include_lowest=True) + labels = None + bins = None + else: + # Convert reference dataset into 10 quantiles + + # User can define the bin length by declaring GSK_TEST_DRIFT_BIN_COUNT in docker file + num_quantile = os.environ.get('GSK_TEST_DRIFT_BIN_COUNT', 10) + + num_unique_values = len(np.unique(data_series)) + if num_unique_values < num_quantile: # If the given reference set is too small + labels = range(num_unique_values - 1) + + # Every unique value in the dataset will be a new bin + converted_series, bins = data_series, sorted(data_series.unique()) + else: + converted_tuple = pd.qcut(data_series, num_quantile, labels=range(num_quantile), duplicates='drop', + retbins=True) + converted_series, bins = converted_tuple + labels = range(num_quantile) + + return converted_series, labels, bins \ No newline at end of file