Skip to content

Commit

Permalink
Numerical column stats update (#1089)
Browse files Browse the repository at this point in the history
* partial update to numerical_column_stats

* update with full polars replacement

* reduce redundant if statement

* fix histogram warning

* remove unneeded casting
  • Loading branch information
atl1502 committed Apr 15, 2024
1 parent d62cf8d commit 8e294f7
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 41 deletions.
147 changes: 106 additions & 41 deletions dataprofiler/profilers/numerical_column_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import numpy.typing as npt
import pandas as pd
import polars as pl
import scipy.stats

from . import float_column_profile, histogram_utils, profiler_utils
Expand Down Expand Up @@ -83,6 +84,7 @@ def __init__(self, options: NumericalOptions = None) -> None:
self.num_zeros: int | np.int64 = np.int64(0)
self.num_negatives: int | np.int64 = np.int64(0)
self._num_quantiles: int = 1000 # By default, we use 1000 quantiles
self._greater_than_64_bit: bool = False

if options:
self.bias_correction = options.bias_correction.is_enabled
Expand Down Expand Up @@ -1125,10 +1127,12 @@ def _estimate_stats_from_histogram(self) -> np.float64:
def _total_histogram_bin_variance(
self, input_array: np.ndarray | pd.Series
) -> float:
if type(input_array) is pd.Series:
input_array = pl.from_pandas(input_array)
input_array = input_array.to_numpy()
# calculate total variance over all bins of a histogram
bin_counts = self._stored_histogram["histogram"]["bin_counts"]
bin_edges = self._stored_histogram["histogram"]["bin_edges"]

# account ofr digitize which is exclusive
bin_edges = bin_edges.copy()
bin_edges[-1] += 1e-3
Expand All @@ -1151,6 +1155,9 @@ def _histogram_bin_error(self, input_array: np.ndarray | pd.Series) -> np.float6
:return: binning error
:rtype: float
"""
if type(input_array) is pd.Series:
input_array = pl.from_pandas(input_array)
input_array = input_array.to_numpy()
bin_edges = self._stored_histogram["histogram"]["bin_edges"]

# account ofr digitize which is exclusive
Expand Down Expand Up @@ -1265,7 +1272,7 @@ def _histogram_to_array(self) -> np.ndarray:
return array_flatten

def _get_histogram(
self, values: np.ndarray | pd.Series
self, values: np.ndarray | pl.Series
) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate stored histogram the suggested bin counts for each histogram method.
Expand All @@ -1278,10 +1285,7 @@ def _get_histogram(
"""
if len(np.unique(values)) == 1:
bin_counts = np.array([len(values)])
if isinstance(values, (np.ndarray, list)):
unique_value = values[0]
else:
unique_value = values.iloc[0]
unique_value = values[0]
bin_edges = np.array([unique_value, unique_value])
for bin_method in self.histogram_bin_method_names:
self.histogram_methods[bin_method]["histogram"][
Expand Down Expand Up @@ -1322,12 +1326,15 @@ def _get_histogram(
def _merge_histogram(self, values: np.ndarray | pd.Series) -> None:
# values is the current array of values,
# that needs to be updated to the accumulated histogram
if type(values) is pd.Series:
values = pl.from_pandas(values)
values = values.to_numpy()
combined_values = np.concatenate([values, self._histogram_to_array()])
bin_counts, bin_edges = self._get_histogram(combined_values)
self._stored_histogram["histogram"]["bin_counts"] = bin_counts
self._stored_histogram["histogram"]["bin_edges"] = bin_edges

def _update_histogram(self, df_series: pd.Series) -> None:
def _update_histogram(self, df_series: pd.Series | np.ndarray) -> None:
"""
Update histogram for each method and the combined method.
Expand All @@ -1348,12 +1355,23 @@ def _update_histogram(self, df_series: pd.Series) -> None:
:type df_series: pandas.core.series.Series
:return:
"""
df_series = df_series.replace([np.inf, -np.inf], np.nan).dropna()
if df_series.empty:
return
if self._greater_than_64_bit and type(df_series) is pd.Series:
df_series = df_series.to_numpy(dtype=float)
df_series = df_series[np.isfinite(df_series)]
if df_series.size == 0:
return
else:
df_series = pl.from_pandas(df_series, nan_to_null=True).cast(pl.Float64)
df_series = df_series.replace([np.inf, -np.inf], [None]) # type: ignore
df_series = df_series.drop_nulls()
if df_series.is_empty():
return

if self._has_histogram:
self._merge_histogram(df_series.tolist())
if self._greater_than_64_bit:
self._merge_histogram(df_series.tolist())
else:
self._merge_histogram(df_series.to_list())
else:
bin_counts, bin_edges = self._get_histogram(df_series)
self._stored_histogram["histogram"]["bin_counts"] = bin_counts
Expand Down Expand Up @@ -1741,8 +1759,26 @@ def _update_helper(self, df_series_clean: pd.Series, profile: dict) -> None:
:type profile: dict
:return: None
"""
if df_series_clean.empty:
return
self._greater_than_64_bit = (
not df_series_clean.empty
and df_series_clean.apply(pd.to_numeric, errors="coerce").dtype == "O"
)
if self._greater_than_64_bit:
df_series_clean = df_series_clean.to_numpy()
df_series_clean = df_series_clean[df_series_clean != np.nan]
if df_series_clean.size == 0:
return
df_series_clean = pd.Series(df_series_clean)
else:
df_series_clean = pl.from_pandas(df_series_clean)
if df_series_clean.dtype == pl.String:
df_series_clean = df_series_clean.str.strip_chars().cast(pl.Float64)
else:
df_series_clean = df_series_clean.cast(pl.Float64)
if df_series_clean.is_empty():
return
df_series_clean = df_series_clean.to_pandas()
df_series_clean = df_series_clean.astype(float)

prev_dependent_properties = {
"mean": self.mean,
Expand All @@ -1751,7 +1787,6 @@ def _update_helper(self, df_series_clean: pd.Series, profile: dict) -> None:
"biased_kurtosis": self._biased_kurtosis,
}
subset_properties = copy.deepcopy(profile)
df_series_clean = df_series_clean.astype(float)
super()._perform_property_calcs( # type: ignore
self.__calculations,
df_series=df_series_clean,
Expand All @@ -1765,63 +1800,89 @@ def _update_helper(self, df_series_clean: pd.Series, profile: dict) -> None:
@BaseColumnProfiler._timeit(name="min")
def _get_min(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
min_value = df_series.min()
self.min = min_value if not self.min else min(self.min, min_value)
if self._greater_than_64_bit:
min_value = np.min(df_series)
self.min = min_value if not self.min else min(self.min, min_value)
else:
df_series = pl.from_pandas(df_series)
min_value = df_series.min()
self.min = np.float64(
min_value if not self.min else min(self.min, min_value)
)
subset_properties["min"] = min_value

@BaseColumnProfiler._timeit(name="max")
def _get_max(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
max_value = df_series.max()
self.max = max_value if not self.max else max(self.max, max_value)
if self._greater_than_64_bit:
max_value = np.max(df_series)
self.max = max_value if not self.max else max(self.max, max_value)
else:
df_series = pl.from_pandas(df_series)
max_value = df_series.max()
if self.max is not None:
max_value = type(self.max)(max_value)
self.max = np.float64(
max_value if not self.max else max(self.max, max_value)
)
subset_properties["max"] = max_value

@BaseColumnProfiler._timeit(name="sum")
def _get_sum(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
if np.isinf(self.sum) or (np.isnan(self.sum) and self.match_count > 0):
return

sum_value = df_series.sum()
if np.isinf(sum_value) or (len(df_series) > 0 and np.isnan(sum_value)):
warnings.warn(
"Infinite or invalid values found in data. "
"Future statistics (mean, variance, skewness, kurtosis) "
"will not be computed.",
RuntimeWarning,
)
if self._greater_than_64_bit:
sum_value = np.sum(df_series)
if len(df_series) > 0 and sum_value == np.nan:
warnings.warn(
"Infinite or invalid values found in data. "
"Future statistics (mean, variance, skewness, kurtosis) "
"will not be computed.",
RuntimeWarning,
)
else:
df_series = pl.from_pandas(df_series)
sum_value = df_series.sum()
if np.isinf(sum_value) or (len(df_series) > 0 and np.isnan(sum_value)):
warnings.warn(
"Infinite or invalid values found in data. "
"Future statistics (mean, variance, skewness, kurtosis) "
"will not be computed.",
RuntimeWarning,
)

subset_properties["sum"] = sum_value
self.sum = self.sum + sum_value

@BaseColumnProfiler._timeit(name="variance")
def _get_variance(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
if np.isinf(self._biased_variance) or (
np.isnan(self._biased_variance) and self.match_count > 0
):
return

# Suppress any numpy warnings as we have a custom warning for invalid
# or infinite data already
with np.errstate(all="ignore"):
batch_biased_variance = np.var(df_series) # Obtains biased variance
if self._greater_than_64_bit:
batch_biased_variance = np.var(df_series)
else:
df_series = pl.from_pandas(df_series)
batch_biased_variance = np.var([df_series])
subset_properties["biased_variance"] = batch_biased_variance
sum_value = subset_properties["sum"]
batch_count = subset_properties["match_count"]
Expand All @@ -1839,7 +1900,7 @@ def _get_variance(
@BaseColumnProfiler._timeit(name="skewness")
def _get_skewness(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand Down Expand Up @@ -1883,7 +1944,7 @@ def _get_skewness(
@BaseColumnProfiler._timeit(name="kurtosis")
def _get_kurtosis(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand Down Expand Up @@ -1930,7 +1991,7 @@ def _get_kurtosis(
@BaseColumnProfiler._timeit(name="histogram_and_quantiles")
def _get_histogram_and_quantiles(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand All @@ -1948,7 +2009,7 @@ def _get_histogram_and_quantiles(
@BaseColumnProfiler._timeit(name="num_zeros")
def _get_num_zeros(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand All @@ -1963,14 +2024,16 @@ def _get_num_zeros(
:type subset_properties: dict
:return: None
"""
if not self._greater_than_64_bit:
df_series = pl.from_pandas(df_series)
num_zeros_value = (df_series == 0).sum()
subset_properties["num_zeros"] = num_zeros_value
self.num_zeros = self.num_zeros + num_zeros_value

@BaseColumnProfiler._timeit(name="num_negatives")
def _get_num_negatives(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand All @@ -1985,6 +2048,8 @@ def _get_num_negatives(
:type subset_properties: dict
:return: None
"""
if not self._greater_than_64_bit:
df_series = pl.from_pandas(df_series)
num_negatives_value = (df_series < 0).sum()
subset_properties["num_negatives"] = num_negatives_value
self.num_negatives = self.num_negatives + num_negatives_value
Expand Down
2 changes: 2 additions & 0 deletions dataprofiler/tests/profilers/test_float_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,7 @@ def test_json_encode(self):
"num_zeros": 0,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": expected_historam_methods,
"_stored_histogram": {
"total_loss": 0.0,
Expand Down Expand Up @@ -1890,6 +1891,7 @@ def test_json_encode_after_update(self, time):
"num_zeros": 1,
"num_negatives": 0,
"_num_quantiles": 4,
"_greater_than_64_bit": False,
"histogram_methods": expected_historam_methods,
"_stored_histogram": {
"total_loss": 2.0,
Expand Down
2 changes: 2 additions & 0 deletions dataprofiler/tests/profilers/test_int_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,7 @@ def test_json_encode(self):
"num_zeros": 0,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": expected_historam_methods,
"_stored_histogram": {
"total_loss": 0.0,
Expand Down Expand Up @@ -1233,6 +1234,7 @@ def test_json_encode_after_update(self, time):
"num_zeros": 1,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": {
"custom": {
"total_loss": 0.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,7 @@ def test_json_encode(self):
"num_zeros": 0,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": expected_historam_methods,
"_stored_histogram": {
"total_loss": 0.0,
Expand Down
1 change: 1 addition & 0 deletions dataprofiler/tests/profilers/test_text_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ def test_json_encode_after_update(self, time):
"num_zeros": 0,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": {
"custom": {
"total_loss": 0.0,
Expand Down

0 comments on commit 8e294f7

Please sign in to comment.