Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor _assimilate_histogram and _regenerate_histogram #1022

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 104 additions & 1 deletion dataprofiler/profilers/histogram_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
https://github.com/numpy/numpy/blob/main/LICENSE.txt
"""
import operator
from typing import List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union

import numpy as np
from numpy.lib.histograms import ( # type: ignore[attr-defined]
Expand Down Expand Up @@ -348,3 +348,106 @@ def _calculate_bins_from_profile(profile, bin_method):
# the IQR of the data is zero.
n_equal_bins = 1
return n_equal_bins


def _assimilate_histogram(
from_hist_entity_count_per_bin: np.ndarray,
from_hist_bin_edges: np.ndarray,
dest_hist_entity_count_per_bin: np.ndarray,
dest_hist_bin_edges: np.ndarray,
dest_hist_num_bin: int,
) -> Tuple[Dict[str, np.ndarray], float]:
"""
Assimilates a histogram into another histogram using specifications.

:param from_hist_entity_count_per_bin: the breakdown of number of entities
within a given histogram bin (to be assimilated)
:type from_hist_entity_count_per_bin: List[float]
:param from_hist_bin_edges: List of value ranges for histogram bins
(to be assimilated)
:type from_hist_bin_edges: List[Tuple[float]]
:param from_hist_entity_count_per_bin: the breakdown of number of
entities within a given histogram bin (assimilated to)
:type dest_hist_entity_count_per_bin: List[float]
:param dest_hist_bin_edges: List of value ranges for histogram bins
(assimilated to)
:type dest_hist_bin_edges: List[Tuple[float]]
:param dest_hist_num_bin: The number of bins desired for histogram
:type dest_hist_num_bin: int
:return: Tuple containing dictionary of histogram info and histogram loss
"""
# allocate bin_counts
new_bin_id = 0
hist_loss = 0
for bin_id, bin_count in enumerate(from_hist_entity_count_per_bin):
if not bin_count: # if nothing in bin, nothing to add
continue

bin_edge = from_hist_bin_edges[bin_id : bin_id + 3]

# loop until we have a new bin which contains the current bin.
while (
bin_edge[0] >= dest_hist_bin_edges[new_bin_id + 1]
and new_bin_id < dest_hist_num_bin - 1
):
new_bin_id += 1

new_bin_edge = dest_hist_bin_edges[new_bin_id : new_bin_id + 3]

# find where the current bin falls within the new bins
is_last_bin = new_bin_id == dest_hist_num_bin - 1
if bin_edge[1] < new_bin_edge[1] or is_last_bin:
# current bin is within the new bin
dest_hist_entity_count_per_bin[new_bin_id] += bin_count
hist_loss += (
((new_bin_edge[1] + new_bin_edge[0]) - (bin_edge[1] + bin_edge[0])) / 2
) ** 2 * bin_count
elif bin_edge[0] < new_bin_edge[1]:
# current bin straddles two of the new bins
# get the percentage of bin that falls to the left
percentage_in_left_bin = (new_bin_edge[1] - bin_edge[0]) / (
bin_edge[1] - bin_edge[0]
)
count_in_left_bin = round(bin_count * percentage_in_left_bin)
dest_hist_entity_count_per_bin[new_bin_id] += count_in_left_bin
hist_loss += (
((new_bin_edge[1] + new_bin_edge[0]) - (bin_edge[1] + bin_edge[0])) / 2
) ** 2 * count_in_left_bin

# allocate leftovers to the right bin
dest_hist_entity_count_per_bin[new_bin_id + 1] += (
bin_count - count_in_left_bin
)
hist_loss += (
((new_bin_edge[2] + new_bin_edge[1]) - (bin_edge[1] + bin_edge[0])) / 2
) ** 2 * (bin_count - count_in_left_bin)

# increment bin id to the right bin
new_bin_id += 1
return (
{
"bin_edges": dest_hist_bin_edges,
"bin_counts": dest_hist_entity_count_per_bin,
},
hist_loss,
)


def _regenerate_histogram(
entity_count_per_bin, bin_edges, suggested_bin_count, options=None
) -> Tuple[Dict[str, np.ndarray], float]:

# create proper binning
new_bin_counts = np.zeros((suggested_bin_count,))
new_bin_edges = np.linspace(bin_edges[0], bin_edges[-1], suggested_bin_count + 1)
if options:
new_bin_edges = np.linspace(
options["min_edge"], options["max_edge"], suggested_bin_count + 1
)
return _assimilate_histogram(
from_hist_entity_count_per_bin=entity_count_per_bin,
from_hist_bin_edges=bin_edges,
dest_hist_entity_count_per_bin=new_bin_counts,
dest_hist_bin_edges=new_bin_edges,
dest_hist_num_bin=suggested_bin_count,
)
123 changes: 5 additions & 118 deletions dataprofiler/profilers/numerical_column_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def _add_helper_merge_profile_histograms(
new_entity_count_by_bin = np.zeros((ideal_count_of_bins,))

# Generate new histograms
_, hist_loss1 = self._assimilate_histogram(
_, hist_loss1 = histogram_utils._assimilate_histogram(
from_hist_entity_count_per_bin=other1._stored_histogram["histogram"][
"bin_counts"
],
Expand All @@ -237,7 +237,7 @@ def _add_helper_merge_profile_histograms(
)

# Ensure loss is calculated on second run of regenerate
_, hist_loss2 = self._assimilate_histogram(
_, hist_loss2 = histogram_utils._assimilate_histogram(
from_hist_entity_count_per_bin=other2._stored_histogram["histogram"][
"bin_counts"
],
Expand Down Expand Up @@ -706,7 +706,7 @@ def _preprocess_for_calculate_psi(

# re-calculate `self` histogram
if len_self_bin_counts != num_psi_bins:
histogram, hist_loss = self._regenerate_histogram(
histogram, hist_loss = histogram_utils._regenerate_histogram(
entity_count_per_bin=self_histogram["bin_counts"],
bin_edges=self_histogram["bin_edges"],
suggested_bin_count=num_psi_bins,
Expand All @@ -727,7 +727,7 @@ def _preprocess_for_calculate_psi(
histogram_edges_not_equal = True

if histogram_edges_not_equal:
histogram, hist_loss = self._regenerate_histogram(
histogram, hist_loss = histogram_utils._regenerate_histogram(
entity_count_per_bin=other_histogram["bin_counts"],
bin_edges=other_histogram["bin_edges"],
suggested_bin_count=num_psi_bins,
Expand Down Expand Up @@ -1359,119 +1359,6 @@ def _update_histogram(self, df_series: pd.Series) -> None:
self._stored_histogram["current_loss"] = histogram_loss
self._stored_histogram["total_loss"] += histogram_loss

def _regenerate_histogram(
self, entity_count_per_bin, bin_edges, suggested_bin_count, options=None
) -> tuple[dict[str, np.ndarray], float]:

# create proper binning
new_bin_counts = np.zeros((suggested_bin_count,))
new_bin_edges = np.linspace(
bin_edges[0], bin_edges[-1], suggested_bin_count + 1
)
if options:
new_bin_edges = np.linspace(
options["min_edge"], options["max_edge"], suggested_bin_count + 1
)
return self._assimilate_histogram(
from_hist_entity_count_per_bin=entity_count_per_bin,
from_hist_bin_edges=bin_edges,
dest_hist_entity_count_per_bin=new_bin_counts,
dest_hist_bin_edges=new_bin_edges,
dest_hist_num_bin=suggested_bin_count,
)

def _assimilate_histogram(
self,
from_hist_entity_count_per_bin: np.ndarray,
from_hist_bin_edges: np.ndarray,
dest_hist_entity_count_per_bin: np.ndarray,
dest_hist_bin_edges: np.ndarray,
dest_hist_num_bin: int,
) -> tuple[dict[str, np.ndarray[Any, Any]], float]:
"""
Assimilates a histogram into another histogram using specifications.

:param from_hist_entity_count_per_bin: the breakdown of number of entities
within a given histogram bin (to be assimilated)
:type from_hist_entity_count_per_bin: List[float]
:param from_hist_bin_edges: List of value ranges for histogram bins
(to be assimilated)
:type from_hist_bin_edges: List[Tuple[float]]
:param from_hist_entity_count_per_bin: the breakdown of number of
entities within a given histogram bin (assimilated to)
:type dest_hist_entity_count_per_bin: List[float]
:param dest_hist_bin_edges: List of value ranges for histogram bins
(assimilated to)
:type dest_hist_bin_edges: List[Tuple[float]]
:type dest_hist_bin_edges: List[Tuple[float]
:param dest_hist_num_bin: The number of bins desired for histogram
:type dest_hist_num_bin: int
:return: Tuple containing dictionary of histogram info and histogram loss
"""
# allocate bin_counts
new_bin_id = 0
hist_loss = 0
for bin_id, bin_count in enumerate(from_hist_entity_count_per_bin):
if not bin_count: # if nothing in bin, nothing to add
continue

bin_edge = from_hist_bin_edges[bin_id : bin_id + 3]

# if we know not float, we can assume values in bins are integers.
is_float_profile = self.__class__.__name__ == "FloatColumn"
if not is_float_profile:
bin_edge = np.round(bin_edge)

# loop until we have a new bin which contains the current bin.
while (
bin_edge[0] >= dest_hist_bin_edges[new_bin_id + 1]
and new_bin_id < dest_hist_num_bin - 1
):
new_bin_id += 1

new_bin_edge = dest_hist_bin_edges[new_bin_id : new_bin_id + 3]

# find where the current bin falls within the new bins
is_last_bin = new_bin_id == dest_hist_num_bin - 1
if bin_edge[1] < new_bin_edge[1] or is_last_bin:
# current bin is within the new bin
dest_hist_entity_count_per_bin[new_bin_id] += bin_count
hist_loss += (
((new_bin_edge[1] + new_bin_edge[0]) - (bin_edge[1] + bin_edge[0]))
/ 2
) ** 2 * bin_count
elif bin_edge[0] < new_bin_edge[1]:
# current bin straddles two of the new bins
# get the percentage of bin that falls to the left
percentage_in_left_bin = (new_bin_edge[1] - bin_edge[0]) / (
bin_edge[1] - bin_edge[0]
)
count_in_left_bin = round(bin_count * percentage_in_left_bin)
dest_hist_entity_count_per_bin[new_bin_id] += count_in_left_bin
hist_loss += (
((new_bin_edge[1] + new_bin_edge[0]) - (bin_edge[1] + bin_edge[0]))
/ 2
) ** 2 * count_in_left_bin

# allocate leftovers to the right bin
dest_hist_entity_count_per_bin[new_bin_id + 1] += (
bin_count - count_in_left_bin
)
hist_loss += (
((new_bin_edge[2] + new_bin_edge[1]) - (bin_edge[1] + bin_edge[0]))
/ 2
) ** 2 * (bin_count - count_in_left_bin)

# increment bin id to the right bin
new_bin_id += 1
return (
{
"bin_edges": dest_hist_bin_edges,
"bin_counts": dest_hist_entity_count_per_bin,
},
hist_loss,
)

def _histogram_for_profile(
self, histogram_method: str
) -> tuple[dict[str, np.ndarray], float]:
Expand Down Expand Up @@ -1509,7 +1396,7 @@ def _histogram_for_profile(
self._stored_histogram["total_loss"],
)

return self._regenerate_histogram(
return histogram_utils._regenerate_histogram(
entity_count_per_bin=bin_counts,
bin_edges=bin_edges,
suggested_bin_count=suggested_bin_count,
Expand Down
Loading