diff --git a/src/ydata_profiling/model/spark/describe_boolean_spark.py b/src/ydata_profiling/model/spark/describe_boolean_spark.py index ab5cf20fb..ea685bff7 100644 --- a/src/ydata_profiling/model/spark/describe_boolean_spark.py +++ b/src/ydata_profiling/model/spark/describe_boolean_spark.py @@ -4,12 +4,13 @@ from ydata_profiling.config import Settings from ydata_profiling.model.summary_algorithms import describe_boolean_1d +from ydata_profiling.model.var_description.default import VarDescriptionHashable @describe_boolean_1d.register def describe_boolean_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescriptionHashable +) -> Tuple[Settings, DataFrame, VarDescriptionHashable]: """Describe a boolean series. Args: diff --git a/src/ydata_profiling/model/spark/describe_categorical_spark.py b/src/ydata_profiling/model/spark/describe_categorical_spark.py index 5afdb475c..ff763d5c5 100644 --- a/src/ydata_profiling/model/spark/describe_categorical_spark.py +++ b/src/ydata_profiling/model/spark/describe_categorical_spark.py @@ -4,12 +4,13 @@ from ydata_profiling.config import Settings from ydata_profiling.model.summary_algorithms import describe_categorical_1d +from ydata_profiling.model.var_description.default import VarDescriptionHashable @describe_categorical_1d.register def describe_categorical_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescriptionHashable +) -> Tuple[Settings, DataFrame, VarDescriptionHashable]: """Describe a categorical series. Args: diff --git a/src/ydata_profiling/model/spark/describe_date_spark.py b/src/ydata_profiling/model/spark/describe_date_spark.py index a5e11a0f1..71abf836c 100644 --- a/src/ydata_profiling/model/spark/describe_date_spark.py +++ b/src/ydata_profiling/model/spark/describe_date_spark.py @@ -6,6 +6,7 @@ from ydata_profiling.config import Settings from ydata_profiling.model.summary_algorithms import describe_date_1d +from ydata_profiling.model.var_description.default import VarDescriptionHashable def date_stats_spark(df: DataFrame, summary: dict) -> dict: @@ -21,8 +22,8 @@ def date_stats_spark(df: DataFrame, summary: dict) -> dict: @describe_date_1d.register def describe_date_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescriptionHashable +) -> Tuple[Settings, DataFrame, VarDescriptionHashable]: """Describe a date series. Args: diff --git a/src/ydata_profiling/model/spark/describe_numeric_spark.py b/src/ydata_profiling/model/spark/describe_numeric_spark.py index 490e33aba..3111a073e 100644 --- a/src/ydata_profiling/model/spark/describe_numeric_spark.py +++ b/src/ydata_profiling/model/spark/describe_numeric_spark.py @@ -9,9 +9,10 @@ describe_numeric_1d, histogram_compute, ) +from ydata_profiling.model.var_description.default import VarDescriptionHashable -def numeric_stats_spark(df: DataFrame, summary: dict) -> dict: +def numeric_stats_spark(df: DataFrame, summary: VarDescriptionHashable) -> dict: column = df.columns[0] expr = [ @@ -29,8 +30,8 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict: @describe_numeric_1d.register def describe_numeric_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescriptionHashable +) -> Tuple[Settings, DataFrame, VarDescriptionHashable]: """Describe a boolean series. Args: @@ -51,7 +52,7 @@ def describe_numeric_1d_spark( summary["kurtosis"] = stats["kurtosis"] summary["sum"] = stats["sum"] - value_counts = summary["value_counts"] + value_counts = summary.value_counts n_infinite = ( value_counts.where(F.col(df.columns[0]).isin([np.inf, -np.inf])) @@ -106,12 +107,12 @@ def describe_numeric_1d_spark( ).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0] # FIXME: move to fmt - summary["p_negative"] = summary["n_negative"] / summary["n"] + summary["p_negative"] = summary["n_negative"] / summary.n summary["range"] = summary["max"] - summary["min"] summary["iqr"] = summary["75%"] - summary["25%"] summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.NaN - summary["p_zeros"] = summary["n_zeros"] / summary["n"] - summary["p_infinite"] = summary["n_infinite"] / summary["n"] + summary["p_zeros"] = summary["n_zeros"] / summary.n + summary["p_infinite"] = summary["n_infinite"] / summary.n # TODO - enable this feature # because spark doesn't have an indexing system, there isn't really the idea of monotonic increase/decrease @@ -124,14 +125,14 @@ def describe_numeric_1d_spark( # display in pandas display # the alternative is to do this in spark natively, but it is not trivial infinity_values = [np.inf, -np.inf] - infinity_index = summary["value_counts_without_nan"].index.isin(infinity_values) + infinity_index = summary.value_counts_without_nan.index.isin(infinity_values) summary.update( histogram_compute( config, - summary["value_counts_without_nan"][~infinity_index].index.values, - summary["n_distinct"], - weights=summary["value_counts_without_nan"][~infinity_index].values, + summary.value_counts_without_nan[~infinity_index].index.values, + summary.n_distinct, + weights=summary.value_counts_without_nan[~infinity_index].values, ) ) diff --git a/src/ydata_profiling/model/spark/describe_supported_spark.py b/src/ydata_profiling/model/spark/describe_supported_spark.py index 8362b67d0..d5d395156 100644 --- a/src/ydata_profiling/model/spark/describe_supported_spark.py +++ b/src/ydata_profiling/model/spark/describe_supported_spark.py @@ -3,13 +3,17 @@ from pyspark.sql import DataFrame from ydata_profiling.config import Settings +from ydata_profiling.model.spark.var_description.default_spark import ( + get_default_spark_description, +) from ydata_profiling.model.summary_algorithms import describe_supported +from ydata_profiling.model.var_description.default import VarDescription @describe_supported.register def describe_supported_spark( config: Settings, series: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: +) -> Tuple[Settings, DataFrame, VarDescription]: """Describe a supported series. Args: series: The Series to describe. @@ -18,16 +22,6 @@ def describe_supported_spark( A dict containing calculated series description values. """ - # number of non-NaN observations in the Series - count = summary["count"] - n_distinct = summary["value_counts"].count() + series_description = get_default_spark_description(config, series, summary) - summary["n_distinct"] = n_distinct - summary["p_distinct"] = n_distinct / count if count > 0 else 0 - - n_unique = summary["value_counts"].where("count == 1").count() - summary["is_unique"] = n_unique == count - summary["n_unique"] = n_unique - summary["p_unique"] = n_unique / count - - return config, series, summary + return config, series, series_description diff --git a/src/ydata_profiling/model/spark/describe_text_spark.py b/src/ydata_profiling/model/spark/describe_text_spark.py index b5e27f615..50e698d31 100644 --- a/src/ydata_profiling/model/spark/describe_text_spark.py +++ b/src/ydata_profiling/model/spark/describe_text_spark.py @@ -4,12 +4,13 @@ from ydata_profiling.config import Settings from ydata_profiling.model.summary_algorithms import describe_text_1d +from ydata_profiling.model.var_description.default import VarDescriptionHashable @describe_text_1d.register def describe_text_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescriptionHashable +) -> Tuple[Settings, DataFrame, VarDescriptionHashable]: """Describe a categorical series. Args: diff --git a/src/ydata_profiling/model/spark/var_description/counts_spark.py b/src/ydata_profiling/model/spark/var_description/counts_spark.py index af5563fd9..15a2c2bd3 100644 --- a/src/ydata_profiling/model/spark/var_description/counts_spark.py +++ b/src/ydata_profiling/model/spark/var_description/counts_spark.py @@ -4,69 +4,52 @@ from ydata_profiling.model.var_description.counts import VarCounts -class VarCountsSpark(VarCounts): - value_counts_without_nan: DataFrame - """Counts of values in the series without NaN.""" - value_counts_index_sorted: DataFrame - """Sorted counts of values in the series without NaN.""" - value_counts: DataFrame - - def __init__(self, config: Settings, series: DataFrame): - """Counts the values in a series (with and without NaN, distinct). - - Args: - config: report Settings object - series: Series for which we want to calculate the values. - summary: series' summary - - Returns: - A dictionary with the count values (with and without NaN, distinct). - """ - length = series.count() - - value_counts = series.groupBy(series.columns).count() - value_counts = value_counts.sort("count", ascending=False).persist() - value_counts_index_sorted = value_counts.sort(series.columns[0], ascending=True) - - n_missing = value_counts.where(value_counts[series.columns[0]].isNull()).first() - if n_missing is None: - n_missing = 0 - else: - n_missing = n_missing["count"] - - # FIXME: reduce to top-n and bottom-n - value_counts_index_sorted = ( - value_counts_index_sorted.limit(200) - .toPandas() - .set_index(series.columns[0], drop=True) - .squeeze(axis="columns") - ) - - # this is necessary as freqtables requires value_counts_without_nan - # to be a pandas series. However, if we try to get everything into - # pandas we will definitly crash the server - value_counts_without_nan = ( - value_counts.dropna() - .limit(200) - .toPandas() - .set_index(series.columns[0], drop=True) - .squeeze(axis="columns") - ) - - # FIXME: This is not correct, but used to fulfil render expectations - # @chanedwin - memory_size = 0 - - self.value_counts = value_counts - super().__init__( - hashable=False, - value_counts_without_nan=value_counts_without_nan, - value_counts_index_sorted=value_counts_index_sorted, - ordering=False, - n_missing=n_missing, - n=length, - p_missing=n_missing / length, - count=length - n_missing, - memory_size=memory_size, - value_counts=value_counts.persist(), - ) +def get_counts_spark(config: Settings, series: DataFrame) -> VarCounts: + """Get a VarCounts object for a spark series.""" + length = series.count() + + value_counts = series.groupBy(series.columns).count() + value_counts = value_counts.sort("count", ascending=False).persist() + value_counts_index_sorted = value_counts.sort(series.columns[0], ascending=True) + + n_missing = value_counts.where(value_counts[series.columns[0]].isNull()).first() + if n_missing is None: + n_missing = 0 + else: + n_missing = n_missing["count"] + + # FIXME: reduce to top-n and bottom-n + value_counts_index_sorted = ( + value_counts_index_sorted.limit(200) + .toPandas() + .set_index(series.columns[0], drop=True) + .squeeze(axis="columns") + ) + + # this is necessary as freqtables requires value_counts_without_nan + # to be a pandas series. However, if we try to get everything into + # pandas we will definitly crash the server + value_counts_without_nan = ( + value_counts.dropna() + .limit(200) + .toPandas() + .set_index(series.columns[0], drop=True) + .squeeze(axis="columns") + ) + + # FIXME: This is not correct, but used to fulfil render expectations + # @chanedwin + memory_size = 0 + + return VarCounts( + hashable=False, + value_counts_without_nan=value_counts_without_nan, + value_counts_index_sorted=value_counts_index_sorted, + ordering=False, + n_missing=n_missing, + n=length, + p_missing=n_missing / length, + count=length - n_missing, + memory_size=memory_size, + value_counts=value_counts.persist(), + ) diff --git a/src/ydata_profiling/model/spark/var_description/default_spark.py b/src/ydata_profiling/model/spark/var_description/default_spark.py index a3e4e00ed..75de3fc60 100644 --- a/src/ydata_profiling/model/spark/var_description/default_spark.py +++ b/src/ydata_profiling/model/spark/var_description/default_spark.py @@ -1,55 +1,41 @@ from __future__ import annotations -from dataclasses import dataclass - from pyspark.sql import DataFrame from ydata_profiling.config import Settings -from ydata_profiling.model.spark.var_description.counts_spark import VarCountsSpark +from ydata_profiling.model.spark.var_description.counts_spark import get_counts_spark from ydata_profiling.model.var_description.default import VarDescriptionHashable -@dataclass -class VarDescriptionSparkHashable(VarDescriptionHashable): - """Default description for pandas columns.""" - - @classmethod - def from_var_counts( - cls, var_counts: VarCountsSpark, init_dict: dict - ) -> VarDescriptionSparkHashable: - """Get a default description from a VarCountsPandas object.""" - - count = var_counts.count - n_distinct = var_counts.value_counts.count() - - p_distinct = n_distinct / count if count > 0 else 0 - - n_unique = var_counts.value_counts.where("count == 1").count() - is_unique = n_unique == count - p_unique = n_unique / count - - return VarDescriptionSparkHashable( - n=var_counts.n, - count=var_counts.count, - n_missing=var_counts.n_missing, - p_missing=var_counts.p_missing, - hashable=var_counts.hashable, - memory_size=var_counts.memory_size, - ordering=var_counts.ordering, - value_counts_index_sorted=var_counts.value_counts_index_sorted, - value_counts_without_nan=var_counts.value_counts_without_nan, - var_specific=init_dict, - is_unique=is_unique, - n_unique=n_unique, - n_distinct=n_distinct, - p_distinct=p_distinct, - p_unique=p_unique, - value_counts=var_counts.value_counts, - ) - - def get_default_spark_description( config: Settings, series: DataFrame, init_dict: dict -) -> VarDescriptionSparkHashable: - _var_counts = VarCountsSpark(config, series) - return VarDescriptionSparkHashable.from_var_counts(_var_counts, init_dict) +) -> VarDescriptionHashable: + var_counts = get_counts_spark(config, series) + + count = var_counts.count + n_distinct = var_counts.value_counts.count() + + p_distinct = n_distinct / count if count > 0 else 0 + + n_unique = var_counts.value_counts.where("count == 1").count() + is_unique = n_unique == count + p_unique = n_unique / count + + return VarDescriptionHashable( + n=var_counts.n, + count=var_counts.count, + n_missing=var_counts.n_missing, + p_missing=var_counts.p_missing, + hashable=var_counts.hashable, + memory_size=var_counts.memory_size, + ordering=var_counts.ordering, + value_counts_index_sorted=var_counts.value_counts_index_sorted, + value_counts_without_nan=var_counts.value_counts_without_nan, + var_specific=init_dict, + is_unique=is_unique, + n_unique=n_unique, + n_distinct=n_distinct, + p_distinct=p_distinct, + p_unique=p_unique, + value_counts=var_counts.value_counts, + )