diff --git a/dataprofiler/profilers/histogram_utils.py b/dataprofiler/profilers/histogram_utils.py index df230c4c7..8870a2142 100644 --- a/dataprofiler/profilers/histogram_utils.py +++ b/dataprofiler/profilers/histogram_utils.py @@ -8,7 +8,7 @@ https://github.com/numpy/numpy/blob/main/LICENSE.txt """ import operator -from typing import List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union import numpy as np from numpy.lib.histograms import ( # type: ignore[attr-defined] @@ -348,3 +348,106 @@ def _calculate_bins_from_profile(profile, bin_method): # the IQR of the data is zero. n_equal_bins = 1 return n_equal_bins + + +def _assimilate_histogram( + from_hist_entity_count_per_bin: np.ndarray, + from_hist_bin_edges: np.ndarray, + dest_hist_entity_count_per_bin: np.ndarray, + dest_hist_bin_edges: np.ndarray, + dest_hist_num_bin: int, +) -> Tuple[Dict[str, np.ndarray], float]: + """ + Assimilates a histogram into another histogram using specifications. + + :param from_hist_entity_count_per_bin: the breakdown of number of entities + within a given histogram bin (to be assimilated) + :type from_hist_entity_count_per_bin: List[float] + :param from_hist_bin_edges: List of value ranges for histogram bins + (to be assimilated) + :type from_hist_bin_edges: List[Tuple[float]] + :param from_hist_entity_count_per_bin: the breakdown of number of + entities within a given histogram bin (assimilated to) + :type dest_hist_entity_count_per_bin: List[float] + :param dest_hist_bin_edges: List of value ranges for histogram bins + (assimilated to) + :type dest_hist_bin_edges: List[Tuple[float]] + :param dest_hist_num_bin: The number of bins desired for histogram + :type dest_hist_num_bin: int + :return: Tuple containing dictionary of histogram info and histogram loss + """ + # allocate bin_counts + new_bin_id = 0 + hist_loss = 0 + for bin_id, bin_count in enumerate(from_hist_entity_count_per_bin): + if not bin_count: # if nothing in bin, nothing to add + continue + + bin_edge = from_hist_bin_edges[bin_id : bin_id + 3] + + # loop until we have a new bin which contains the current bin. + while ( + bin_edge[0] >= dest_hist_bin_edges[new_bin_id + 1] + and new_bin_id < dest_hist_num_bin - 1 + ): + new_bin_id += 1 + + new_bin_edge = dest_hist_bin_edges[new_bin_id : new_bin_id + 3] + + # find where the current bin falls within the new bins + is_last_bin = new_bin_id == dest_hist_num_bin - 1 + if bin_edge[1] < new_bin_edge[1] or is_last_bin: + # current bin is within the new bin + dest_hist_entity_count_per_bin[new_bin_id] += bin_count + hist_loss += ( + ((new_bin_edge[1] + new_bin_edge[0]) - (bin_edge[1] + bin_edge[0])) / 2 + ) ** 2 * bin_count + elif bin_edge[0] < new_bin_edge[1]: + # current bin straddles two of the new bins + # get the percentage of bin that falls to the left + percentage_in_left_bin = (new_bin_edge[1] - bin_edge[0]) / ( + bin_edge[1] - bin_edge[0] + ) + count_in_left_bin = round(bin_count * percentage_in_left_bin) + dest_hist_entity_count_per_bin[new_bin_id] += count_in_left_bin + hist_loss += ( + ((new_bin_edge[1] + new_bin_edge[0]) - (bin_edge[1] + bin_edge[0])) / 2 + ) ** 2 * count_in_left_bin + + # allocate leftovers to the right bin + dest_hist_entity_count_per_bin[new_bin_id + 1] += ( + bin_count - count_in_left_bin + ) + hist_loss += ( + ((new_bin_edge[2] + new_bin_edge[1]) - (bin_edge[1] + bin_edge[0])) / 2 + ) ** 2 * (bin_count - count_in_left_bin) + + # increment bin id to the right bin + new_bin_id += 1 + return ( + { + "bin_edges": dest_hist_bin_edges, + "bin_counts": dest_hist_entity_count_per_bin, + }, + hist_loss, + ) + + +def _regenerate_histogram( + entity_count_per_bin, bin_edges, suggested_bin_count, options=None +) -> Tuple[Dict[str, np.ndarray], float]: + + # create proper binning + new_bin_counts = np.zeros((suggested_bin_count,)) + new_bin_edges = np.linspace(bin_edges[0], bin_edges[-1], suggested_bin_count + 1) + if options: + new_bin_edges = np.linspace( + options["min_edge"], options["max_edge"], suggested_bin_count + 1 + ) + return _assimilate_histogram( + from_hist_entity_count_per_bin=entity_count_per_bin, + from_hist_bin_edges=bin_edges, + dest_hist_entity_count_per_bin=new_bin_counts, + dest_hist_bin_edges=new_bin_edges, + dest_hist_num_bin=suggested_bin_count, + ) diff --git a/dataprofiler/profilers/numerical_column_stats.py b/dataprofiler/profilers/numerical_column_stats.py index 2b35c8792..3d4531da5 100644 --- a/dataprofiler/profilers/numerical_column_stats.py +++ b/dataprofiler/profilers/numerical_column_stats.py @@ -226,7 +226,7 @@ def _add_helper_merge_profile_histograms( new_entity_count_by_bin = np.zeros((ideal_count_of_bins,)) # Generate new histograms - _, hist_loss1 = self._assimilate_histogram( + _, hist_loss1 = histogram_utils._assimilate_histogram( from_hist_entity_count_per_bin=other1._stored_histogram["histogram"][ "bin_counts" ], @@ -237,7 +237,7 @@ def _add_helper_merge_profile_histograms( ) # Ensure loss is calculated on second run of regenerate - _, hist_loss2 = self._assimilate_histogram( + _, hist_loss2 = histogram_utils._assimilate_histogram( from_hist_entity_count_per_bin=other2._stored_histogram["histogram"][ "bin_counts" ], @@ -706,7 +706,7 @@ def _preprocess_for_calculate_psi( # re-calculate `self` histogram if len_self_bin_counts != num_psi_bins: - histogram, hist_loss = self._regenerate_histogram( + histogram, hist_loss = histogram_utils._regenerate_histogram( entity_count_per_bin=self_histogram["bin_counts"], bin_edges=self_histogram["bin_edges"], suggested_bin_count=num_psi_bins, @@ -727,7 +727,7 @@ def _preprocess_for_calculate_psi( histogram_edges_not_equal = True if histogram_edges_not_equal: - histogram, hist_loss = self._regenerate_histogram( + histogram, hist_loss = histogram_utils._regenerate_histogram( entity_count_per_bin=other_histogram["bin_counts"], bin_edges=other_histogram["bin_edges"], suggested_bin_count=num_psi_bins, @@ -1359,119 +1359,6 @@ def _update_histogram(self, df_series: pd.Series) -> None: self._stored_histogram["current_loss"] = histogram_loss self._stored_histogram["total_loss"] += histogram_loss - def _regenerate_histogram( - self, entity_count_per_bin, bin_edges, suggested_bin_count, options=None - ) -> tuple[dict[str, np.ndarray], float]: - - # create proper binning - new_bin_counts = np.zeros((suggested_bin_count,)) - new_bin_edges = np.linspace( - bin_edges[0], bin_edges[-1], suggested_bin_count + 1 - ) - if options: - new_bin_edges = np.linspace( - options["min_edge"], options["max_edge"], suggested_bin_count + 1 - ) - return self._assimilate_histogram( - from_hist_entity_count_per_bin=entity_count_per_bin, - from_hist_bin_edges=bin_edges, - dest_hist_entity_count_per_bin=new_bin_counts, - dest_hist_bin_edges=new_bin_edges, - dest_hist_num_bin=suggested_bin_count, - ) - - def _assimilate_histogram( - self, - from_hist_entity_count_per_bin: np.ndarray, - from_hist_bin_edges: np.ndarray, - dest_hist_entity_count_per_bin: np.ndarray, - dest_hist_bin_edges: np.ndarray, - dest_hist_num_bin: int, - ) -> tuple[dict[str, np.ndarray[Any, Any]], float]: - """ - Assimilates a histogram into another histogram using specifications. - - :param from_hist_entity_count_per_bin: the breakdown of number of entities - within a given histogram bin (to be assimilated) - :type from_hist_entity_count_per_bin: List[float] - :param from_hist_bin_edges: List of value ranges for histogram bins - (to be assimilated) - :type from_hist_bin_edges: List[Tuple[float]] - :param from_hist_entity_count_per_bin: the breakdown of number of - entities within a given histogram bin (assimilated to) - :type dest_hist_entity_count_per_bin: List[float] - :param dest_hist_bin_edges: List of value ranges for histogram bins - (assimilated to) - :type dest_hist_bin_edges: List[Tuple[float]] - :type dest_hist_bin_edges: List[Tuple[float] - :param dest_hist_num_bin: The number of bins desired for histogram - :type dest_hist_num_bin: int - :return: Tuple containing dictionary of histogram info and histogram loss - """ - # allocate bin_counts - new_bin_id = 0 - hist_loss = 0 - for bin_id, bin_count in enumerate(from_hist_entity_count_per_bin): - if not bin_count: # if nothing in bin, nothing to add - continue - - bin_edge = from_hist_bin_edges[bin_id : bin_id + 3] - - # if we know not float, we can assume values in bins are integers. - is_float_profile = self.__class__.__name__ == "FloatColumn" - if not is_float_profile: - bin_edge = np.round(bin_edge) - - # loop until we have a new bin which contains the current bin. - while ( - bin_edge[0] >= dest_hist_bin_edges[new_bin_id + 1] - and new_bin_id < dest_hist_num_bin - 1 - ): - new_bin_id += 1 - - new_bin_edge = dest_hist_bin_edges[new_bin_id : new_bin_id + 3] - - # find where the current bin falls within the new bins - is_last_bin = new_bin_id == dest_hist_num_bin - 1 - if bin_edge[1] < new_bin_edge[1] or is_last_bin: - # current bin is within the new bin - dest_hist_entity_count_per_bin[new_bin_id] += bin_count - hist_loss += ( - ((new_bin_edge[1] + new_bin_edge[0]) - (bin_edge[1] + bin_edge[0])) - / 2 - ) ** 2 * bin_count - elif bin_edge[0] < new_bin_edge[1]: - # current bin straddles two of the new bins - # get the percentage of bin that falls to the left - percentage_in_left_bin = (new_bin_edge[1] - bin_edge[0]) / ( - bin_edge[1] - bin_edge[0] - ) - count_in_left_bin = round(bin_count * percentage_in_left_bin) - dest_hist_entity_count_per_bin[new_bin_id] += count_in_left_bin - hist_loss += ( - ((new_bin_edge[1] + new_bin_edge[0]) - (bin_edge[1] + bin_edge[0])) - / 2 - ) ** 2 * count_in_left_bin - - # allocate leftovers to the right bin - dest_hist_entity_count_per_bin[new_bin_id + 1] += ( - bin_count - count_in_left_bin - ) - hist_loss += ( - ((new_bin_edge[2] + new_bin_edge[1]) - (bin_edge[1] + bin_edge[0])) - / 2 - ) ** 2 * (bin_count - count_in_left_bin) - - # increment bin id to the right bin - new_bin_id += 1 - return ( - { - "bin_edges": dest_hist_bin_edges, - "bin_counts": dest_hist_entity_count_per_bin, - }, - hist_loss, - ) - def _histogram_for_profile( self, histogram_method: str ) -> tuple[dict[str, np.ndarray], float]: @@ -1509,7 +1396,7 @@ def _histogram_for_profile( self._stored_histogram["total_loss"], ) - return self._regenerate_histogram( + return histogram_utils._regenerate_histogram( entity_count_per_bin=bin_counts, bin_edges=bin_edges, suggested_bin_count=suggested_bin_count,