Skip to content

Commit

Permalink
feat: update spark description methods
Browse files Browse the repository at this point in the history
  • Loading branch information
vorel99 committed Sep 22, 2023
1 parent 7bbba9e commit 954f22c
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 143 deletions.
5 changes: 3 additions & 2 deletions src/ydata_profiling/model/spark/describe_boolean_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

from ydata_profiling.config import Settings
from ydata_profiling.model.summary_algorithms import describe_boolean_1d
from ydata_profiling.model.var_description.default import VarDescriptionHashable


@describe_boolean_1d.register
def describe_boolean_1d_spark(
config: Settings, df: DataFrame, summary: dict
) -> Tuple[Settings, DataFrame, dict]:
config: Settings, df: DataFrame, summary: VarDescriptionHashable
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
"""Describe a boolean series.
Args:
Expand Down
5 changes: 3 additions & 2 deletions src/ydata_profiling/model/spark/describe_categorical_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

from ydata_profiling.config import Settings
from ydata_profiling.model.summary_algorithms import describe_categorical_1d
from ydata_profiling.model.var_description.default import VarDescriptionHashable


@describe_categorical_1d.register
def describe_categorical_1d_spark(
config: Settings, df: DataFrame, summary: dict
) -> Tuple[Settings, DataFrame, dict]:
config: Settings, df: DataFrame, summary: VarDescriptionHashable
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
"""Describe a categorical series.
Args:
Expand Down
5 changes: 3 additions & 2 deletions src/ydata_profiling/model/spark/describe_date_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ydata_profiling.config import Settings
from ydata_profiling.model.summary_algorithms import describe_date_1d
from ydata_profiling.model.var_description.default import VarDescriptionHashable


def date_stats_spark(df: DataFrame, summary: dict) -> dict:
Expand All @@ -21,8 +22,8 @@ def date_stats_spark(df: DataFrame, summary: dict) -> dict:

@describe_date_1d.register
def describe_date_1d_spark(
config: Settings, df: DataFrame, summary: dict
) -> Tuple[Settings, DataFrame, dict]:
config: Settings, df: DataFrame, summary: VarDescriptionHashable
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
"""Describe a date series.
Args:
Expand Down
23 changes: 12 additions & 11 deletions src/ydata_profiling/model/spark/describe_numeric_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
describe_numeric_1d,
histogram_compute,
)
from ydata_profiling.model.var_description.default import VarDescriptionHashable


def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
def numeric_stats_spark(df: DataFrame, summary: VarDescriptionHashable) -> dict:
column = df.columns[0]

expr = [
Expand All @@ -29,8 +30,8 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:

@describe_numeric_1d.register
def describe_numeric_1d_spark(
config: Settings, df: DataFrame, summary: dict
) -> Tuple[Settings, DataFrame, dict]:
config: Settings, df: DataFrame, summary: VarDescriptionHashable
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
"""Describe a boolean series.
Args:
Expand All @@ -51,7 +52,7 @@ def describe_numeric_1d_spark(
summary["kurtosis"] = stats["kurtosis"]
summary["sum"] = stats["sum"]

value_counts = summary["value_counts"]
value_counts = summary.value_counts

n_infinite = (
value_counts.where(F.col(df.columns[0]).isin([np.inf, -np.inf]))
Expand Down Expand Up @@ -106,12 +107,12 @@ def describe_numeric_1d_spark(
).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0]

# FIXME: move to fmt
summary["p_negative"] = summary["n_negative"] / summary["n"]
summary["p_negative"] = summary["n_negative"] / summary.n
summary["range"] = summary["max"] - summary["min"]
summary["iqr"] = summary["75%"] - summary["25%"]
summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.NaN
summary["p_zeros"] = summary["n_zeros"] / summary["n"]
summary["p_infinite"] = summary["n_infinite"] / summary["n"]
summary["p_zeros"] = summary["n_zeros"] / summary.n
summary["p_infinite"] = summary["n_infinite"] / summary.n

# TODO - enable this feature
# because spark doesn't have an indexing system, there isn't really the idea of monotonic increase/decrease
Expand All @@ -124,14 +125,14 @@ def describe_numeric_1d_spark(
# display in pandas display
# the alternative is to do this in spark natively, but it is not trivial
infinity_values = [np.inf, -np.inf]
infinity_index = summary["value_counts_without_nan"].index.isin(infinity_values)
infinity_index = summary.value_counts_without_nan.index.isin(infinity_values)

summary.update(
histogram_compute(
config,
summary["value_counts_without_nan"][~infinity_index].index.values,
summary["n_distinct"],
weights=summary["value_counts_without_nan"][~infinity_index].values,
summary.value_counts_without_nan[~infinity_index].index.values,
summary.n_distinct,
weights=summary.value_counts_without_nan[~infinity_index].values,
)
)

Expand Down
20 changes: 7 additions & 13 deletions src/ydata_profiling/model/spark/describe_supported_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
from pyspark.sql import DataFrame

from ydata_profiling.config import Settings
from ydata_profiling.model.spark.var_description.default_spark import (
get_default_spark_description,
)
from ydata_profiling.model.summary_algorithms import describe_supported
from ydata_profiling.model.var_description.default import VarDescription


@describe_supported.register
def describe_supported_spark(
config: Settings, series: DataFrame, summary: dict
) -> Tuple[Settings, DataFrame, dict]:
) -> Tuple[Settings, DataFrame, VarDescription]:
"""Describe a supported series.
Args:
series: The Series to describe.
Expand All @@ -18,16 +22,6 @@ def describe_supported_spark(
A dict containing calculated series description values.
"""

# number of non-NaN observations in the Series
count = summary["count"]
n_distinct = summary["value_counts"].count()
series_description = get_default_spark_description(config, series, summary)

summary["n_distinct"] = n_distinct
summary["p_distinct"] = n_distinct / count if count > 0 else 0

n_unique = summary["value_counts"].where("count == 1").count()
summary["is_unique"] = n_unique == count
summary["n_unique"] = n_unique
summary["p_unique"] = n_unique / count

return config, series, summary
return config, series, series_description
5 changes: 3 additions & 2 deletions src/ydata_profiling/model/spark/describe_text_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

from ydata_profiling.config import Settings
from ydata_profiling.model.summary_algorithms import describe_text_1d
from ydata_profiling.model.var_description.default import VarDescriptionHashable


@describe_text_1d.register
def describe_text_1d_spark(
config: Settings, df: DataFrame, summary: dict
) -> Tuple[Settings, DataFrame, dict]:
config: Settings, df: DataFrame, summary: VarDescriptionHashable
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
"""Describe a categorical series.
Args:
Expand Down
115 changes: 49 additions & 66 deletions src/ydata_profiling/model/spark/var_description/counts_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,69 +4,52 @@
from ydata_profiling.model.var_description.counts import VarCounts


class VarCountsSpark(VarCounts):
value_counts_without_nan: DataFrame
"""Counts of values in the series without NaN."""
value_counts_index_sorted: DataFrame
"""Sorted counts of values in the series without NaN."""
value_counts: DataFrame

def __init__(self, config: Settings, series: DataFrame):
"""Counts the values in a series (with and without NaN, distinct).
Args:
config: report Settings object
series: Series for which we want to calculate the values.
summary: series' summary
Returns:
A dictionary with the count values (with and without NaN, distinct).
"""
length = series.count()

value_counts = series.groupBy(series.columns).count()
value_counts = value_counts.sort("count", ascending=False).persist()
value_counts_index_sorted = value_counts.sort(series.columns[0], ascending=True)

n_missing = value_counts.where(value_counts[series.columns[0]].isNull()).first()
if n_missing is None:
n_missing = 0
else:
n_missing = n_missing["count"]

# FIXME: reduce to top-n and bottom-n
value_counts_index_sorted = (
value_counts_index_sorted.limit(200)
.toPandas()
.set_index(series.columns[0], drop=True)
.squeeze(axis="columns")
)

# this is necessary as freqtables requires value_counts_without_nan
# to be a pandas series. However, if we try to get everything into
# pandas we will definitly crash the server
value_counts_without_nan = (
value_counts.dropna()
.limit(200)
.toPandas()
.set_index(series.columns[0], drop=True)
.squeeze(axis="columns")
)

# FIXME: This is not correct, but used to fulfil render expectations
# @chanedwin
memory_size = 0

self.value_counts = value_counts
super().__init__(
hashable=False,
value_counts_without_nan=value_counts_without_nan,
value_counts_index_sorted=value_counts_index_sorted,
ordering=False,
n_missing=n_missing,
n=length,
p_missing=n_missing / length,
count=length - n_missing,
memory_size=memory_size,
value_counts=value_counts.persist(),
)
def get_counts_spark(config: Settings, series: DataFrame) -> VarCounts:
"""Get a VarCounts object for a spark series."""
length = series.count()

value_counts = series.groupBy(series.columns).count()
value_counts = value_counts.sort("count", ascending=False).persist()
value_counts_index_sorted = value_counts.sort(series.columns[0], ascending=True)

n_missing = value_counts.where(value_counts[series.columns[0]].isNull()).first()
if n_missing is None:
n_missing = 0
else:
n_missing = n_missing["count"]

# FIXME: reduce to top-n and bottom-n
value_counts_index_sorted = (
value_counts_index_sorted.limit(200)
.toPandas()
.set_index(series.columns[0], drop=True)
.squeeze(axis="columns")
)

# this is necessary as freqtables requires value_counts_without_nan
# to be a pandas series. However, if we try to get everything into
# pandas we will definitly crash the server
value_counts_without_nan = (
value_counts.dropna()
.limit(200)
.toPandas()
.set_index(series.columns[0], drop=True)
.squeeze(axis="columns")
)

# FIXME: This is not correct, but used to fulfil render expectations
# @chanedwin
memory_size = 0

return VarCounts(
hashable=False,
value_counts_without_nan=value_counts_without_nan,
value_counts_index_sorted=value_counts_index_sorted,
ordering=False,
n_missing=n_missing,
n=length,
p_missing=n_missing / length,
count=length - n_missing,
memory_size=memory_size,
value_counts=value_counts.persist(),
)
76 changes: 31 additions & 45 deletions src/ydata_profiling/model/spark/var_description/default_spark.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,41 @@
from __future__ import annotations

from dataclasses import dataclass

from pyspark.sql import DataFrame

from ydata_profiling.config import Settings
from ydata_profiling.model.spark.var_description.counts_spark import VarCountsSpark
from ydata_profiling.model.spark.var_description.counts_spark import get_counts_spark
from ydata_profiling.model.var_description.default import VarDescriptionHashable


@dataclass
class VarDescriptionSparkHashable(VarDescriptionHashable):
"""Default description for pandas columns."""

@classmethod
def from_var_counts(
cls, var_counts: VarCountsSpark, init_dict: dict
) -> VarDescriptionSparkHashable:
"""Get a default description from a VarCountsPandas object."""

count = var_counts.count
n_distinct = var_counts.value_counts.count()

p_distinct = n_distinct / count if count > 0 else 0

n_unique = var_counts.value_counts.where("count == 1").count()
is_unique = n_unique == count
p_unique = n_unique / count

return VarDescriptionSparkHashable(
n=var_counts.n,
count=var_counts.count,
n_missing=var_counts.n_missing,
p_missing=var_counts.p_missing,
hashable=var_counts.hashable,
memory_size=var_counts.memory_size,
ordering=var_counts.ordering,
value_counts_index_sorted=var_counts.value_counts_index_sorted,
value_counts_without_nan=var_counts.value_counts_without_nan,
var_specific=init_dict,
is_unique=is_unique,
n_unique=n_unique,
n_distinct=n_distinct,
p_distinct=p_distinct,
p_unique=p_unique,
value_counts=var_counts.value_counts,
)


def get_default_spark_description(
config: Settings, series: DataFrame, init_dict: dict
) -> VarDescriptionSparkHashable:
_var_counts = VarCountsSpark(config, series)
return VarDescriptionSparkHashable.from_var_counts(_var_counts, init_dict)
) -> VarDescriptionHashable:
var_counts = get_counts_spark(config, series)

count = var_counts.count
n_distinct = var_counts.value_counts.count()

p_distinct = n_distinct / count if count > 0 else 0

n_unique = var_counts.value_counts.where("count == 1").count()
is_unique = n_unique == count
p_unique = n_unique / count

return VarDescriptionHashable(
n=var_counts.n,
count=var_counts.count,
n_missing=var_counts.n_missing,
p_missing=var_counts.p_missing,
hashable=var_counts.hashable,
memory_size=var_counts.memory_size,
ordering=var_counts.ordering,
value_counts_index_sorted=var_counts.value_counts_index_sorted,
value_counts_without_nan=var_counts.value_counts_without_nan,
var_specific=init_dict,
is_unique=is_unique,
n_unique=n_unique,
n_distinct=n_distinct,
p_distinct=p_distinct,
p_unique=p_unique,
value_counts=var_counts.value_counts,
)

0 comments on commit 954f22c

Please sign in to comment.