From 16c64c9beba330350f1dfa4b44fbf91b4226e64b Mon Sep 17 00:00:00 2001 From: Jan Cap Date: Sat, 16 Dec 2023 20:05:58 +0100 Subject: [PATCH] feat: add spark support --- src/ydata_profiling/model/pandas/__init__.py | 4 -- .../model/pandas/describe_counts_pandas.py | 64 ------------------- .../model/pandas/describe_generic_pandas.py | 37 ----------- src/ydata_profiling/model/spark/__init__.py | 4 -- .../model/spark/correlations_spark.py | 1 + .../model/spark/describe_boolean_spark.py | 7 +- .../model/spark/describe_categorical_spark.py | 5 +- .../model/spark/describe_date_spark.py | 5 +- .../model/spark/describe_generic_spark.py | 32 ---------- .../model/spark/describe_numeric_spark.py | 21 +++--- .../model/spark/describe_supported_spark.py | 20 ++---- .../model/spark/describe_text_spark.py | 5 +- .../model/spark/summary_spark.py | 1 + .../model/spark/timeseries_index_spark.py | 1 + .../counts_spark.py} | 42 ++++++------ .../spark/var_description/default_spark.py | 46 +++++++++++++ .../model/var_description/default.py | 4 ++ 17 files changed, 105 insertions(+), 194 deletions(-) delete mode 100644 src/ydata_profiling/model/pandas/describe_counts_pandas.py delete mode 100644 src/ydata_profiling/model/pandas/describe_generic_pandas.py delete mode 100644 src/ydata_profiling/model/spark/describe_generic_spark.py rename src/ydata_profiling/model/spark/{describe_counts_spark.py => var_description/counts_spark.py} (58%) create mode 100644 src/ydata_profiling/model/spark/var_description/default_spark.py diff --git a/src/ydata_profiling/model/pandas/__init__.py b/src/ydata_profiling/model/pandas/__init__.py index 59ccf853c..e929d4731 100644 --- a/src/ydata_profiling/model/pandas/__init__.py +++ b/src/ydata_profiling/model/pandas/__init__.py @@ -3,10 +3,8 @@ dataframe_pandas, describe_boolean_pandas, describe_categorical_pandas, - describe_counts_pandas, describe_date_pandas, describe_file_pandas, - describe_generic_pandas, describe_image_pandas, describe_numeric_pandas, describe_path_pandas, @@ -27,10 +25,8 @@ "dataframe_pandas", "describe_boolean_pandas", "describe_categorical_pandas", - "describe_counts_pandas", "describe_date_pandas", "describe_file_pandas", - "describe_generic_pandas", "describe_image_pandas", "describe_numeric_pandas", "describe_path_pandas", diff --git a/src/ydata_profiling/model/pandas/describe_counts_pandas.py b/src/ydata_profiling/model/pandas/describe_counts_pandas.py deleted file mode 100644 index 416474d25..000000000 --- a/src/ydata_profiling/model/pandas/describe_counts_pandas.py +++ /dev/null @@ -1,64 +0,0 @@ -from typing import Tuple - -import pandas as pd - -from ydata_profiling.config import Settings -from ydata_profiling.model.summary_algorithms import describe_counts -from ydata_profiling.model.var_description.default import VarDescription - - -@describe_counts.register -def pandas_describe_counts( - config: Settings, series: pd.Series, summary: VarDescription -) -> Tuple[Settings, pd.Series, VarDescription]: - """Counts the values in a series (with and without NaN, distinct). - - Args: - config: report Settings object - series: Series for which we want to calculate the values. - summary: series' summary - - Returns: - A dictionary with the count values (with and without NaN, distinct). - """ - try: - value_counts_with_nan = series.value_counts(dropna=False) - _ = set(value_counts_with_nan.index) - hashable = True - except: # noqa: E722 - hashable = False - - summary.hashable = hashable - - if hashable: - value_counts_with_nan = value_counts_with_nan[value_counts_with_nan > 0] - - null_index = value_counts_with_nan.index.isnull() - if null_index.any(): - n_missing = value_counts_with_nan[null_index].sum() - value_counts_without_nan = value_counts_with_nan[~null_index] - else: - n_missing = 0 - value_counts_without_nan = value_counts_with_nan - - summary.update( - { - "value_counts_without_nan": value_counts_without_nan, - } - ) - - try: - summary["value_counts_index_sorted"] = summary[ - "value_counts_without_nan" - ].sort_index(ascending=True) - ordering = True - except TypeError: - ordering = False - else: - n_missing = series.isna().sum() - ordering = False - - summary["ordering"] = ordering - summary.n_missing = n_missing - - return config, series, summary diff --git a/src/ydata_profiling/model/pandas/describe_generic_pandas.py b/src/ydata_profiling/model/pandas/describe_generic_pandas.py deleted file mode 100644 index fcc5b04b6..000000000 --- a/src/ydata_profiling/model/pandas/describe_generic_pandas.py +++ /dev/null @@ -1,37 +0,0 @@ -from typing import Tuple - -import pandas as pd - -from ydata_profiling.config import Settings -from ydata_profiling.model.summary_algorithms import describe_generic -from ydata_profiling.model.var_description.default import VarDescription - - -@describe_generic.register -def pandas_describe_generic( - config: Settings, series: pd.Series, summary: VarDescription -) -> Tuple[Settings, pd.Series, VarDescription]: - """Describe generic series. - - Args: - config: report Settings object - series: The Series to describe. - summary: The dict containing the series description so far. - - Returns: - A dict containing calculated series description values. - """ - - # number of observations in the Series - length = len(series) - - summary.update( - { - "n": length, - "p_missing": summary.n_missing / length if length > 0 else 0, - "count": length - summary.n_missing, - "memory_size": series.memory_usage(deep=config.memory_deep), - } - ) - - return config, series, summary diff --git a/src/ydata_profiling/model/spark/__init__.py b/src/ydata_profiling/model/spark/__init__.py index 854222a9a..7dc7d5043 100644 --- a/src/ydata_profiling/model/spark/__init__.py +++ b/src/ydata_profiling/model/spark/__init__.py @@ -3,9 +3,7 @@ dataframe_spark, describe_boolean_spark, describe_categorical_spark, - describe_counts_spark, describe_date_spark, - describe_generic_spark, describe_numeric_spark, describe_supported_spark, duplicates_spark, @@ -21,9 +19,7 @@ "dataframe_spark", "describe_boolean_spark", "describe_categorical_spark", - "describe_counts_spark", "describe_date_spark", - "describe_generic_spark", "describe_numeric_spark", "describe_supported_spark", "duplicates_spark", diff --git a/src/ydata_profiling/model/spark/correlations_spark.py b/src/ydata_profiling/model/spark/correlations_spark.py index 6f0f2ae25..51c309378 100644 --- a/src/ydata_profiling/model/spark/correlations_spark.py +++ b/src/ydata_profiling/model/spark/correlations_spark.py @@ -1,4 +1,5 @@ """Correlations between variables.""" + from typing import Optional import pandas as pd diff --git a/src/ydata_profiling/model/spark/describe_boolean_spark.py b/src/ydata_profiling/model/spark/describe_boolean_spark.py index ab5cf20fb..815af74b8 100644 --- a/src/ydata_profiling/model/spark/describe_boolean_spark.py +++ b/src/ydata_profiling/model/spark/describe_boolean_spark.py @@ -4,12 +4,13 @@ from ydata_profiling.config import Settings from ydata_profiling.model.summary_algorithms import describe_boolean_1d +from ydata_profiling.model.var_description.default import VarDescription @describe_boolean_1d.register def describe_boolean_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescription +) -> Tuple[Settings, DataFrame, VarDescription]: """Describe a boolean series. Args: @@ -20,7 +21,7 @@ def describe_boolean_1d_spark( A dict containing calculated series description values. """ - value_counts = summary["value_counts"] + value_counts = summary.value_counts # get the most common boolean value and its frequency top = value_counts.first() diff --git a/src/ydata_profiling/model/spark/describe_categorical_spark.py b/src/ydata_profiling/model/spark/describe_categorical_spark.py index 5afdb475c..562472b3d 100644 --- a/src/ydata_profiling/model/spark/describe_categorical_spark.py +++ b/src/ydata_profiling/model/spark/describe_categorical_spark.py @@ -4,12 +4,13 @@ from ydata_profiling.config import Settings from ydata_profiling.model.summary_algorithms import describe_categorical_1d +from ydata_profiling.model.var_description.default import VarDescription @describe_categorical_1d.register def describe_categorical_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescription +) -> Tuple[Settings, DataFrame, VarDescription]: """Describe a categorical series. Args: diff --git a/src/ydata_profiling/model/spark/describe_date_spark.py b/src/ydata_profiling/model/spark/describe_date_spark.py index a5e11a0f1..4bcee2bbf 100644 --- a/src/ydata_profiling/model/spark/describe_date_spark.py +++ b/src/ydata_profiling/model/spark/describe_date_spark.py @@ -6,6 +6,7 @@ from ydata_profiling.config import Settings from ydata_profiling.model.summary_algorithms import describe_date_1d +from ydata_profiling.model.var_description.default import VarDescription def date_stats_spark(df: DataFrame, summary: dict) -> dict: @@ -21,8 +22,8 @@ def date_stats_spark(df: DataFrame, summary: dict) -> dict: @describe_date_1d.register def describe_date_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescription +) -> Tuple[Settings, DataFrame, VarDescription]: """Describe a date series. Args: diff --git a/src/ydata_profiling/model/spark/describe_generic_spark.py b/src/ydata_profiling/model/spark/describe_generic_spark.py deleted file mode 100644 index ee2356c0a..000000000 --- a/src/ydata_profiling/model/spark/describe_generic_spark.py +++ /dev/null @@ -1,32 +0,0 @@ -from typing import Tuple - -from pyspark.sql import DataFrame - -from ydata_profiling.config import Settings -from ydata_profiling.model.summary_algorithms import describe_generic - - -@describe_generic.register -def describe_generic_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: - """Describe generic series. - Args: - series: The Series to describe. - summary: The dict containing the series description so far. - Returns: - A dict containing calculated series description values. - """ - - # number of observations in the Series - length = df.count() - - summary["n"] = length - summary["p_missing"] = summary["n_missing"] / length - summary["count"] = length - summary["n_missing"] - - # FIXME: This is not correct, but used to fulfil render expectations - # @chanedwin - summary["memory_size"] = 0 - - return config, df, summary diff --git a/src/ydata_profiling/model/spark/describe_numeric_spark.py b/src/ydata_profiling/model/spark/describe_numeric_spark.py index 490e33aba..a9fca55cc 100644 --- a/src/ydata_profiling/model/spark/describe_numeric_spark.py +++ b/src/ydata_profiling/model/spark/describe_numeric_spark.py @@ -9,9 +9,10 @@ describe_numeric_1d, histogram_compute, ) +from ydata_profiling.model.var_description.default import VarDescription -def numeric_stats_spark(df: DataFrame, summary: dict) -> dict: +def numeric_stats_spark(df: DataFrame, summary: VarDescription) -> dict: column = df.columns[0] expr = [ @@ -29,8 +30,8 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict: @describe_numeric_1d.register def describe_numeric_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescription +) -> Tuple[Settings, DataFrame, VarDescription]: """Describe a boolean series. Args: @@ -51,7 +52,7 @@ def describe_numeric_1d_spark( summary["kurtosis"] = stats["kurtosis"] summary["sum"] = stats["sum"] - value_counts = summary["value_counts"] + value_counts = summary.value_counts n_infinite = ( value_counts.where(F.col(df.columns[0]).isin([np.inf, -np.inf])) @@ -106,12 +107,12 @@ def describe_numeric_1d_spark( ).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0] # FIXME: move to fmt - summary["p_negative"] = summary["n_negative"] / summary["n"] + summary["p_negative"] = summary["n_negative"] / summary.n summary["range"] = summary["max"] - summary["min"] summary["iqr"] = summary["75%"] - summary["25%"] summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.NaN - summary["p_zeros"] = summary["n_zeros"] / summary["n"] - summary["p_infinite"] = summary["n_infinite"] / summary["n"] + summary["p_zeros"] = summary["n_zeros"] / summary.n + summary["p_infinite"] = summary["n_infinite"] / summary.n # TODO - enable this feature # because spark doesn't have an indexing system, there isn't really the idea of monotonic increase/decrease @@ -124,14 +125,14 @@ def describe_numeric_1d_spark( # display in pandas display # the alternative is to do this in spark natively, but it is not trivial infinity_values = [np.inf, -np.inf] - infinity_index = summary["value_counts_without_nan"].index.isin(infinity_values) + infinity_index = summary.value_counts_without_nan.index.isin(infinity_values) summary.update( histogram_compute( config, - summary["value_counts_without_nan"][~infinity_index].index.values, + summary.value_counts_without_nan[~infinity_index].index.values, summary["n_distinct"], - weights=summary["value_counts_without_nan"][~infinity_index].values, + weights=summary.value_counts_without_nan[~infinity_index].values, ) ) diff --git a/src/ydata_profiling/model/spark/describe_supported_spark.py b/src/ydata_profiling/model/spark/describe_supported_spark.py index 8362b67d0..d5d395156 100644 --- a/src/ydata_profiling/model/spark/describe_supported_spark.py +++ b/src/ydata_profiling/model/spark/describe_supported_spark.py @@ -3,13 +3,17 @@ from pyspark.sql import DataFrame from ydata_profiling.config import Settings +from ydata_profiling.model.spark.var_description.default_spark import ( + get_default_spark_description, +) from ydata_profiling.model.summary_algorithms import describe_supported +from ydata_profiling.model.var_description.default import VarDescription @describe_supported.register def describe_supported_spark( config: Settings, series: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: +) -> Tuple[Settings, DataFrame, VarDescription]: """Describe a supported series. Args: series: The Series to describe. @@ -18,16 +22,6 @@ def describe_supported_spark( A dict containing calculated series description values. """ - # number of non-NaN observations in the Series - count = summary["count"] - n_distinct = summary["value_counts"].count() + series_description = get_default_spark_description(config, series, summary) - summary["n_distinct"] = n_distinct - summary["p_distinct"] = n_distinct / count if count > 0 else 0 - - n_unique = summary["value_counts"].where("count == 1").count() - summary["is_unique"] = n_unique == count - summary["n_unique"] = n_unique - summary["p_unique"] = n_unique / count - - return config, series, summary + return config, series, series_description diff --git a/src/ydata_profiling/model/spark/describe_text_spark.py b/src/ydata_profiling/model/spark/describe_text_spark.py index b5e27f615..6a95b2884 100644 --- a/src/ydata_profiling/model/spark/describe_text_spark.py +++ b/src/ydata_profiling/model/spark/describe_text_spark.py @@ -4,12 +4,13 @@ from ydata_profiling.config import Settings from ydata_profiling.model.summary_algorithms import describe_text_1d +from ydata_profiling.model.var_description.default import VarDescription @describe_text_1d.register def describe_text_1d_spark( - config: Settings, df: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: + config: Settings, df: DataFrame, summary: VarDescription +) -> Tuple[Settings, DataFrame, VarDescription]: """Describe a categorical series. Args: diff --git a/src/ydata_profiling/model/spark/summary_spark.py b/src/ydata_profiling/model/spark/summary_spark.py index f9ce848f3..7e8c75ece 100644 --- a/src/ydata_profiling/model/spark/summary_spark.py +++ b/src/ydata_profiling/model/spark/summary_spark.py @@ -1,4 +1,5 @@ """Compute statistical description of datasets.""" + import multiprocessing from typing import Tuple diff --git a/src/ydata_profiling/model/spark/timeseries_index_spark.py b/src/ydata_profiling/model/spark/timeseries_index_spark.py index cdf3d88dd..236825e6a 100644 --- a/src/ydata_profiling/model/spark/timeseries_index_spark.py +++ b/src/ydata_profiling/model/spark/timeseries_index_spark.py @@ -1,4 +1,5 @@ """Compute statistical description of datasets.""" + from pyspark.sql import DataFrame from ydata_profiling.config import Settings diff --git a/src/ydata_profiling/model/spark/describe_counts_spark.py b/src/ydata_profiling/model/spark/var_description/counts_spark.py similarity index 58% rename from src/ydata_profiling/model/spark/describe_counts_spark.py rename to src/ydata_profiling/model/spark/var_description/counts_spark.py index 0f813f2ce..15a2c2bd3 100644 --- a/src/ydata_profiling/model/spark/describe_counts_spark.py +++ b/src/ydata_profiling/model/spark/var_description/counts_spark.py @@ -1,23 +1,12 @@ -from typing import Tuple - from pyspark.sql import DataFrame from ydata_profiling.config import Settings -from ydata_profiling.model.summary_algorithms import describe_counts - - -@describe_counts.register -def describe_counts_spark( - config: Settings, series: DataFrame, summary: dict -) -> Tuple[Settings, DataFrame, dict]: - """Counts the values in a series (with and without NaN, distinct). +from ydata_profiling.model.var_description.counts import VarCounts - Args: - series: Series for which we want to calculate the values. - Returns: - A dictionary with the count values (with and without NaN, distinct). - """ +def get_counts_spark(config: Settings, series: DataFrame) -> VarCounts: + """Get a VarCounts object for a spark series.""" + length = series.count() value_counts = series.groupBy(series.columns).count() value_counts = value_counts.sort("count", ascending=False).persist() @@ -37,14 +26,10 @@ def describe_counts_spark( .squeeze(axis="columns") ) - summary["n_missing"] = n_missing - summary["value_counts"] = value_counts.persist() - summary["value_counts_index_sorted"] = value_counts_index_sorted - # this is necessary as freqtables requires value_counts_without_nan # to be a pandas series. However, if we try to get everything into # pandas we will definitly crash the server - summary["value_counts_without_nan"] = ( + value_counts_without_nan = ( value_counts.dropna() .limit(200) .toPandas() @@ -52,4 +37,19 @@ def describe_counts_spark( .squeeze(axis="columns") ) - return config, series, summary + # FIXME: This is not correct, but used to fulfil render expectations + # @chanedwin + memory_size = 0 + + return VarCounts( + hashable=False, + value_counts_without_nan=value_counts_without_nan, + value_counts_index_sorted=value_counts_index_sorted, + ordering=False, + n_missing=n_missing, + n=length, + p_missing=n_missing / length, + count=length - n_missing, + memory_size=memory_size, + value_counts=value_counts.persist(), + ) diff --git a/src/ydata_profiling/model/spark/var_description/default_spark.py b/src/ydata_profiling/model/spark/var_description/default_spark.py new file mode 100644 index 000000000..374cfb828 --- /dev/null +++ b/src/ydata_profiling/model/spark/var_description/default_spark.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from pyspark.sql import DataFrame + +from ydata_profiling.config import Settings +from ydata_profiling.model.spark.var_description.counts_spark import get_counts_spark +from ydata_profiling.model.var_description.default import VarDescription + + +def get_default_spark_description( + config: Settings, series: DataFrame, init_dict: dict +) -> VarDescription: + var_counts = get_counts_spark(config, series) + + count = var_counts.count + n_distinct = var_counts.value_counts.count() + + p_distinct = n_distinct / count if count > 0 else 0 + + n_unique = var_counts.value_counts.where("count == 1").count() + is_unique = n_unique == count + p_unique = n_unique / count + + init_dict.update( + { + "n_distinct": n_distinct, + "p_distinct": p_distinct, + "is_unique": is_unique, + "n_unique": n_unique, + "p_unique": p_unique, + } + ) + + return VarDescription( + n=var_counts.n, + count=var_counts.count, + n_missing=var_counts.n_missing, + p_missing=var_counts.p_missing, + hashable=var_counts.hashable, + memory_size=var_counts.memory_size, + ordering=var_counts.ordering, + value_counts_index_sorted=var_counts.value_counts_index_sorted, + value_counts_without_nan=var_counts.value_counts_without_nan, + value_counts=var_counts.value_counts, + var_specific=init_dict, + ) diff --git a/src/ydata_profiling/model/var_description/default.py b/src/ydata_profiling/model/var_description/default.py index 49aa63e82..05fb38ed0 100644 --- a/src/ydata_profiling/model/var_description/default.py +++ b/src/ydata_profiling/model/var_description/default.py @@ -34,6 +34,10 @@ def get(self, key: str, default: Any = None) -> Any: """To support old dict like interface.""" return self.var_specific.get(key, default) + def pop(self, key: str, default: Any = None) -> Any: + """To support old dict like interface.""" + return self.var_specific.pop(key, default) + def __iter__(self) -> Iterator: """To support old dict like interface.""" return self.var_specific.__iter__()