Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Numerical column stats update #1089

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 106 additions & 41 deletions dataprofiler/profilers/numerical_column_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import numpy.typing as npt
import pandas as pd
import polars as pl
import scipy.stats

from . import float_column_profile, histogram_utils, profiler_utils
Expand Down Expand Up @@ -83,6 +84,7 @@ def __init__(self, options: NumericalOptions = None) -> None:
self.num_zeros: int | np.int64 = np.int64(0)
self.num_negatives: int | np.int64 = np.int64(0)
self._num_quantiles: int = 1000 # By default, we use 1000 quantiles
self._greater_than_64_bit: bool = False

if options:
self.bias_correction = options.bias_correction.is_enabled
Expand Down Expand Up @@ -1125,10 +1127,12 @@ def _estimate_stats_from_histogram(self) -> np.float64:
def _total_histogram_bin_variance(
self, input_array: np.ndarray | pd.Series
) -> float:
if type(input_array) is pd.Series:
input_array = pl.from_pandas(input_array)
input_array = input_array.to_numpy()
micdavis marked this conversation as resolved.
Show resolved Hide resolved
# calculate total variance over all bins of a histogram
bin_counts = self._stored_histogram["histogram"]["bin_counts"]
bin_edges = self._stored_histogram["histogram"]["bin_edges"]

# account ofr digitize which is exclusive
bin_edges = bin_edges.copy()
bin_edges[-1] += 1e-3
Expand All @@ -1151,6 +1155,9 @@ def _histogram_bin_error(self, input_array: np.ndarray | pd.Series) -> np.float6
:return: binning error
:rtype: float
"""
if type(input_array) is pd.Series:
input_array = pl.from_pandas(input_array)
input_array = input_array.to_numpy()
bin_edges = self._stored_histogram["histogram"]["bin_edges"]

# account ofr digitize which is exclusive
Expand Down Expand Up @@ -1265,7 +1272,7 @@ def _histogram_to_array(self) -> np.ndarray:
return array_flatten

def _get_histogram(
self, values: np.ndarray | pd.Series
self, values: np.ndarray | pl.Series
) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate stored histogram the suggested bin counts for each histogram method.
Expand All @@ -1278,10 +1285,7 @@ def _get_histogram(
"""
if len(np.unique(values)) == 1:
bin_counts = np.array([len(values)])
if isinstance(values, (np.ndarray, list)):
unique_value = values[0]
else:
unique_value = values.iloc[0]
unique_value = values[0]
bin_edges = np.array([unique_value, unique_value])
for bin_method in self.histogram_bin_method_names:
self.histogram_methods[bin_method]["histogram"][
Expand Down Expand Up @@ -1322,12 +1326,15 @@ def _get_histogram(
def _merge_histogram(self, values: np.ndarray | pd.Series) -> None:
# values is the current array of values,
# that needs to be updated to the accumulated histogram
if type(values) is pd.Series:
values = pl.from_pandas(values)
values = values.to_numpy()
combined_values = np.concatenate([values, self._histogram_to_array()])
bin_counts, bin_edges = self._get_histogram(combined_values)
self._stored_histogram["histogram"]["bin_counts"] = bin_counts
self._stored_histogram["histogram"]["bin_edges"] = bin_edges

def _update_histogram(self, df_series: pd.Series) -> None:
def _update_histogram(self, df_series: pd.Series | np.ndarray) -> None:
"""
Update histogram for each method and the combined method.

Expand All @@ -1348,12 +1355,23 @@ def _update_histogram(self, df_series: pd.Series) -> None:
:type df_series: pandas.core.series.Series
:return:
"""
df_series = df_series.replace([np.inf, -np.inf], np.nan).dropna()
if df_series.empty:
return
if self._greater_than_64_bit and type(df_series) is pd.Series:
df_series = df_series.to_numpy(dtype=float)
df_series = df_series[np.isfinite(df_series)]
if df_series.size == 0:
return
else:
df_series = pl.from_pandas(df_series, nan_to_null=True).cast(pl.Float64)
df_series = df_series.replace([np.inf, -np.inf], [None]) # type: ignore
df_series = df_series.drop_nulls()
if df_series.is_empty():
return

if self._has_histogram:
self._merge_histogram(df_series.tolist())
if self._greater_than_64_bit:
self._merge_histogram(df_series.tolist())
else:
self._merge_histogram(df_series.to_list())
else:
bin_counts, bin_edges = self._get_histogram(df_series)
self._stored_histogram["histogram"]["bin_counts"] = bin_counts
Expand Down Expand Up @@ -1741,8 +1759,26 @@ def _update_helper(self, df_series_clean: pd.Series, profile: dict) -> None:
:type profile: dict
:return: None
"""
if df_series_clean.empty:
return
self._greater_than_64_bit = (
micdavis marked this conversation as resolved.
Show resolved Hide resolved
not df_series_clean.empty
and df_series_clean.apply(pd.to_numeric, errors="coerce").dtype == "O"
)
if self._greater_than_64_bit:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good design choice here I like it

df_series_clean = df_series_clean.to_numpy()
df_series_clean = df_series_clean[df_series_clean != np.nan]
if df_series_clean.size == 0:
return
df_series_clean = pd.Series(df_series_clean)
else:
df_series_clean = pl.from_pandas(df_series_clean)
if df_series_clean.dtype == pl.String:
df_series_clean = df_series_clean.str.strip_chars().cast(pl.Float64)
else:
df_series_clean = df_series_clean.cast(pl.Float64)
if df_series_clean.is_empty():
return
df_series_clean = df_series_clean.to_pandas()
df_series_clean = df_series_clean.astype(float)

prev_dependent_properties = {
"mean": self.mean,
Expand All @@ -1751,7 +1787,6 @@ def _update_helper(self, df_series_clean: pd.Series, profile: dict) -> None:
"biased_kurtosis": self._biased_kurtosis,
}
subset_properties = copy.deepcopy(profile)
df_series_clean = df_series_clean.astype(float)
super()._perform_property_calcs( # type: ignore
self.__calculations,
df_series=df_series_clean,
Expand All @@ -1765,63 +1800,89 @@ def _update_helper(self, df_series_clean: pd.Series, profile: dict) -> None:
@BaseColumnProfiler._timeit(name="min")
def _get_min(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
min_value = df_series.min()
self.min = min_value if not self.min else min(self.min, min_value)
if self._greater_than_64_bit:
min_value = np.min(df_series)
self.min = min_value if not self.min else min(self.min, min_value)
else:
df_series = pl.from_pandas(df_series)
min_value = df_series.min()
self.min = np.float64(
min_value if not self.min else min(self.min, min_value)
)
subset_properties["min"] = min_value

@BaseColumnProfiler._timeit(name="max")
def _get_max(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
max_value = df_series.max()
self.max = max_value if not self.max else max(self.max, max_value)
if self._greater_than_64_bit:
max_value = np.max(df_series)
self.max = max_value if not self.max else max(self.max, max_value)
else:
df_series = pl.from_pandas(df_series)
max_value = df_series.max()
if self.max is not None:
max_value = type(self.max)(max_value)
self.max = np.float64(
max_value if not self.max else max(self.max, max_value)
)
subset_properties["max"] = max_value

@BaseColumnProfiler._timeit(name="sum")
def _get_sum(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
if np.isinf(self.sum) or (np.isnan(self.sum) and self.match_count > 0):
return

sum_value = df_series.sum()
if np.isinf(sum_value) or (len(df_series) > 0 and np.isnan(sum_value)):
warnings.warn(
"Infinite or invalid values found in data. "
"Future statistics (mean, variance, skewness, kurtosis) "
"will not be computed.",
RuntimeWarning,
)
if self._greater_than_64_bit:
sum_value = np.sum(df_series)
if len(df_series) > 0 and sum_value == np.nan:
warnings.warn(
"Infinite or invalid values found in data. "
"Future statistics (mean, variance, skewness, kurtosis) "
"will not be computed.",
RuntimeWarning,
)
else:
df_series = pl.from_pandas(df_series)
sum_value = df_series.sum()
if np.isinf(sum_value) or (len(df_series) > 0 and np.isnan(sum_value)):
warnings.warn(
"Infinite or invalid values found in data. "
"Future statistics (mean, variance, skewness, kurtosis) "
"will not be computed.",
RuntimeWarning,
)

subset_properties["sum"] = sum_value
self.sum = self.sum + sum_value

@BaseColumnProfiler._timeit(name="variance")
def _get_variance(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
if np.isinf(self._biased_variance) or (
np.isnan(self._biased_variance) and self.match_count > 0
):
return

# Suppress any numpy warnings as we have a custom warning for invalid
# or infinite data already
with np.errstate(all="ignore"):
batch_biased_variance = np.var(df_series) # Obtains biased variance
if self._greater_than_64_bit:
batch_biased_variance = np.var(df_series)
else:
df_series = pl.from_pandas(df_series)
batch_biased_variance = np.var([df_series])
subset_properties["biased_variance"] = batch_biased_variance
sum_value = subset_properties["sum"]
batch_count = subset_properties["match_count"]
Expand All @@ -1839,7 +1900,7 @@ def _get_variance(
@BaseColumnProfiler._timeit(name="skewness")
def _get_skewness(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this (and following) add to account for larger than 64 bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand Down Expand Up @@ -1883,7 +1944,7 @@ def _get_skewness(
@BaseColumnProfiler._timeit(name="kurtosis")
def _get_kurtosis(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand Down Expand Up @@ -1930,7 +1991,7 @@ def _get_kurtosis(
@BaseColumnProfiler._timeit(name="histogram_and_quantiles")
def _get_histogram_and_quantiles(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand All @@ -1948,7 +2009,7 @@ def _get_histogram_and_quantiles(
@BaseColumnProfiler._timeit(name="num_zeros")
def _get_num_zeros(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand All @@ -1963,14 +2024,16 @@ def _get_num_zeros(
:type subset_properties: dict
:return: None
"""
if not self._greater_than_64_bit:
df_series = pl.from_pandas(df_series)
micdavis marked this conversation as resolved.
Show resolved Hide resolved
num_zeros_value = (df_series == 0).sum()
subset_properties["num_zeros"] = num_zeros_value
self.num_zeros = self.num_zeros + num_zeros_value

@BaseColumnProfiler._timeit(name="num_negatives")
def _get_num_negatives(
self,
df_series: pd.Series,
df_series: pd.Series | np.ndarray,
prev_dependent_properties: dict,
subset_properties: dict,
) -> None:
Expand All @@ -1985,6 +2048,8 @@ def _get_num_negatives(
:type subset_properties: dict
:return: None
"""
if not self._greater_than_64_bit:
df_series = pl.from_pandas(df_series)
num_negatives_value = (df_series < 0).sum()
subset_properties["num_negatives"] = num_negatives_value
self.num_negatives = self.num_negatives + num_negatives_value
Expand Down
2 changes: 2 additions & 0 deletions dataprofiler/tests/profilers/test_float_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,7 @@ def test_json_encode(self):
"num_zeros": 0,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": expected_historam_methods,
"_stored_histogram": {
"total_loss": 0.0,
Expand Down Expand Up @@ -1890,6 +1891,7 @@ def test_json_encode_after_update(self, time):
"num_zeros": 1,
"num_negatives": 0,
"_num_quantiles": 4,
"_greater_than_64_bit": False,
"histogram_methods": expected_historam_methods,
"_stored_histogram": {
"total_loss": 2.0,
Expand Down
2 changes: 2 additions & 0 deletions dataprofiler/tests/profilers/test_int_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,7 @@ def test_json_encode(self):
"num_zeros": 0,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": expected_historam_methods,
"_stored_histogram": {
"total_loss": 0.0,
Expand Down Expand Up @@ -1233,6 +1234,7 @@ def test_json_encode_after_update(self, time):
"num_zeros": 1,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": {
"custom": {
"total_loss": 0.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,7 @@ def test_json_encode(self):
"num_zeros": 0,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": expected_historam_methods,
"_stored_histogram": {
"total_loss": 0.0,
Expand Down
1 change: 1 addition & 0 deletions dataprofiler/tests/profilers/test_text_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ def test_json_encode_after_update(self, time):
"num_zeros": 0,
"num_negatives": 0,
"_num_quantiles": 1000,
"_greater_than_64_bit": False,
"histogram_methods": {
"custom": {
"total_loss": 0.0,
Expand Down