Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Migration and development of output_df from old PR-106 #38

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
migrated saving and compressing utilities from old PR-106
  • Loading branch information
rabah-khalek committed Oct 21, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit ffb2e9d49726fa063fcb69829eab06144d3a6112
94 changes: 94 additions & 0 deletions giskard/ml_worker/testing/utils.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,16 @@
from giskard.ml_worker.generated.ml_worker_pb2 import SingleTestResult
from giskard.ml_worker.utils.logging import timer

import os
from zstandard import ZstdCompressor, ZstdDecompressor
from typing import Optional
from io import BytesIO
import re
import pandas as pd
import numpy as np

COMPRESSION_MAX_OUTPUT_SIZE = 10 ** 9 # 1GB


def ge_result_to_test_result(result, passed=True) -> SingleTestResult:
"""
@@ -41,3 +51,87 @@ def apply_perturbation_inplace(df: pd.DataFrame, perturbation_dict):
df.loc[idx, pert_col] = new_value
i += 1
return modified_rows

def save_df(df: pd.DataFrame, format: str = "csv") -> bytes:
pandas_version: int = int(re.sub("\D", "", pd.__version__))
if format == "csv":
csv_buffer = BytesIO()
if pandas_version >= 120:
df.to_csv(csv_buffer, index=False)
else:
csv_buffer.write(df.to_csv(index=False).encode("utf-8"))
csv_buffer.seek(0)
return csv_buffer.getvalue()
else:
raise ValueError("Invalid method: {method}. Choose 'csv'.")


def compress(data: bytes, method: Optional[str] = "zstd") -> bytes:
if method == "zstd":
compressor = ZstdCompressor(level=3, write_checksum=True)
compressed_data = compressor.compress(data)
elif method is None:
compressed_data = data
else:
raise ValueError("Invalid compression method: {method}. Choose 'zstd' or None.")

return compressed_data


def decompress(
data: bytes, method: Optional[str] = "zstd", max_output_size: int = COMPRESSION_MAX_OUTPUT_SIZE
) -> bytes:
if method == "zstd":
decompressor = ZstdDecompressor()
decompressed_data = decompressor.decompress(data, max_output_size=max_output_size)
elif method is None:
decompressed_data = data
else:
raise ValueError("Invalid compression method: {method}. Choose 'zstd' or None.")
return decompressed_data


#TODO: Verify this function -- https://giskard.youtrack.cloud/issue/GSK-189/Numerical-Binning-for-drift-tests-not-working-properly
def bin_numerical_values(data_series, labels=None, bins=None):
"""
Bin numerical values into quantile partitions to convert numerical value as categorical value to compute drifts
"""
if labels is not None and bins is not None:
# Convert actual dataset based on bins of reference dataset

if data_series.min() < bins[0]:
# If actual dataset set has value smaller than the minimum bin of reference,
# it is added in the bin with label -1
print(f"Miniumum value {data_series.min()} of Actual Dataset has been appended in the bin")
bins = np.insert(bins, 0, data_series.min())
labels = np.insert(labels, 0, -1)

if data_series.max() > bins[-1]:
# If actual dataset set has value larger than the maximum bin of reference,
# it is added in the bin with label 10
print(f"Maximum value {data_series.max()} of Actual Dataset has been appended in the bin")
bins = np.append(bins, data_series.max())
labels = np.append(labels, 10)

converted_series = pd.cut(data_series, bins=bins, labels=labels, include_lowest=True)
labels = None
bins = None
else:
# Convert reference dataset into 10 quantiles

# User can define the bin length by declaring GSK_TEST_DRIFT_BIN_COUNT in docker file
num_quantile = os.environ.get('GSK_TEST_DRIFT_BIN_COUNT', 10)

num_unique_values = len(np.unique(data_series))
if num_unique_values < num_quantile: # If the given reference set is too small
labels = range(num_unique_values - 1)

# Every unique value in the dataset will be a new bin
converted_series, bins = data_series, sorted(data_series.unique())
else:
converted_tuple = pd.qcut(data_series, num_quantile, labels=range(num_quantile), duplicates='drop',
retbins=True)
converted_series, bins = converted_tuple
labels = range(num_quantile)

return converted_series, labels, bins