Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add product of two distributions #426

Merged
merged 3 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 3 additions & 47 deletions src/sdr/_detection/_theory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
verify_not_specified,
verify_scalar,
)
from .._probability import sum_distribution


@export
Expand Down Expand Up @@ -243,7 +244,7 @@ def _h0(
h0 = scipy.stats.norm(0, np.sqrt(sigma2_per))
elif detector == "linear":
h0 = scipy.stats.chi(nu, scale=np.sqrt(sigma2_per))
h0 = _sum_distribution(h0, n_nc)
h0 = sum_distribution(h0, n_nc)
elif detector == "square-law":
h0 = scipy.stats.chi2(nu * n_nc, scale=sigma2_per)

Expand Down Expand Up @@ -489,58 +490,13 @@ def _h1(
else:
# Folded normal distribution has 1 degree of freedom
h1 = scipy.stats.foldnorm(np.sqrt(lambda_), scale=np.sqrt(sigma2_per))
h1 = _sum_distribution(h1, n_nc)
h1 = sum_distribution(h1, n_nc)
elif detector == "square-law":
h1 = scipy.stats.ncx2(nu * n_nc, lambda_ * n_nc, scale=sigma2_per)

return h1


def _sum_distribution(
dist: scipy.stats.rv_continuous, n_nc: int
) -> scipy.stats.rv_histogram | scipy.stats.rv_continuous:
r"""
Sums a distribution `n_nc` times.

Arguments:
dist: The distribution to sum.
n_nc: The number of times to sum the distribution.

Returns:
The summed distribution.
"""
if n_nc == 1:
return dist

# Determine mean and standard deviation of base distribution
mu, sigma2 = dist.stats()
sigma = np.sqrt(sigma2)

# NOTE: I was only able to get this to work with x starting at 0. When the x axis start below zero,
# I couldn't get the correct offset for the convolved x axis.

# Compute the PDF of the base distribution for 10 standard deviations about the mean
pdf_x = np.linspace(0, mu + 10 * sigma, 1_001)
pdf_y = dist.pdf(pdf_x)
dx = np.mean(np.diff(pdf_x))

# The PDF of the sum of n_nc independent random variables is the convolution of the PDF of the base distribution.
# This is efficiently computed in the frequency domain by exponentiating the FFT. The FFT must be zero-padded
# enough that the circular convolutions do not wrap around.
n_fft = scipy.fft.next_fast_len(pdf_y.size * n_nc)
Y = np.fft.fft(pdf_y, n_fft)
Y = Y**n_nc
y = np.fft.ifft(Y).real
y /= y.sum() * dx
x = np.arange(y.size) * dx + pdf_x[0]

# Adjust the histograms bins to be on either side of each point. So there is one extra point added.
x = np.append(x, x[-1] + dx)
x -= dx / 2

return scipy.stats.rv_histogram((y, x))


@export
def p_d(
snr: npt.ArrayLike,
Expand Down
84 changes: 84 additions & 0 deletions src/sdr/_probability.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,90 @@ def sum_distributions(
return scipy.stats.rv_histogram((f_Z, x))


@export
def multiply_distributions(
X: scipy.stats.rv_continuous | scipy.stats.rv_histogram,
Y: scipy.stats.rv_continuous | scipy.stats.rv_histogram,
x: npt.ArrayLike | None = None,
p: float = 1e-10,
) -> scipy.stats.rv_histogram:
r"""
Numerically calculates the distribution of the product of two independent random variables $X$ and $Y$.

Arguments:
X: The distribution of the first random variable $X$.
Y: The distribution of the second random variable $Y$.
x: The x values at which to evaluate the PDF of the product. If None, the x values are determined based on `p`.
p: The probability of exceeding the x axis, on either side, for each distribution. This is used to determine
the bounds on the x axis for the numerical convolution. Smaller values of $p$ will result in more accurate
analysis, but will require more computation.

Returns:
The distribution of the product $Z = X \cdot Y$.

Notes:
The PDF of the product of two independent random variables is calculated as follows.

$$
f_{X \cdot Y}(t) =
\int_{0}^{\infty} f_X(k) f_Y(t/k) \frac{1}{k} dk -
\int_{-\infty}^{0} f_X(k) f_Y(t/k) \frac{1}{k} dk
$$

Examples:
Compute the distribution of the product of two normal distributions.

.. ipython:: python

X = scipy.stats.norm(loc=-1, scale=0.5)
Y = scipy.stats.norm(loc=2, scale=1.5)
x = np.linspace(-15, 10, 1_001)

@savefig sdr_multiply_distributions_1.png
plt.figure(); \
plt.plot(x, X.pdf(x), label="X"); \
plt.plot(x, Y.pdf(x), label="Y"); \
plt.plot(x, sdr.multiply_distributions(X, Y).pdf(x), label="X * Y"); \
plt.hist(X.rvs(100_000) * Y.rvs(100_000), bins=101, density=True, histtype="step", label="X * Y empirical"); \
plt.legend(); \
plt.xlabel("Random variable"); \
plt.ylabel("Probability density"); \
plt.title("Product of two Normal distributions");

Group:
probability
"""
verify_scalar(p, float=True, exclusive_min=0, inclusive_max=0.1)

if x is None:
# Determine the x axis of each distribution such that the probability of exceeding the x axis, on either side,
# is p.
x1_min, x1_max = _x_range(X, np.sqrt(p))
x2_min, x2_max = _x_range(Y, np.sqrt(p))
bounds = np.array([x1_min * x2_min, x1_min * x2_max, x1_max * x2_min, x1_max * x2_max])
x_min = np.min(bounds)
x_max = np.max(bounds)
x = np.linspace(x_min, x_max, 1_001)
else:
x = verify_arraylike(x, float=True, atleast_1d=True, ndim=1)
x = np.sort(x)
dx = np.mean(np.diff(x))

def integrand(k: float, xi: float) -> float:
return X.pdf(k) * Y.pdf(xi / k) * 1 / k

f_Z = np.zeros_like(x)
for i, xi in enumerate(x):
f_Z[i] = scipy.integrate.quad(integrand, 0, np.inf, args=(xi,))[0]
f_Z[i] -= scipy.integrate.quad(integrand, -np.inf, 0, args=(xi,))[0]

# Adjust the histograms bins to be on either side of each point. So there is one extra point added.
x = np.append(x, x[-1] + dx)
x -= dx / 2

return scipy.stats.rv_histogram((f_Z, x))


def _x_range(X: scipy.stats.rv_continuous, p: float) -> tuple[float, float]:
r"""
Determines the range of x values for a given distribution such that the probability of exceeding the x axis, on
Expand Down
60 changes: 60 additions & 0 deletions tests/probability/test_multiply_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import numpy as np
import scipy.stats

import sdr


def test_normal_normal():
X = scipy.stats.norm(loc=3, scale=0.5)
Y = scipy.stats.norm(loc=5, scale=1.5)
_verify(X, Y)


def test_rayleigh_rayleigh():
X = scipy.stats.rayleigh(scale=1)
Y = scipy.stats.rayleigh(loc=1, scale=2)
_verify(X, Y)


def test_rician_rician():
X = scipy.stats.rice(2)
Y = scipy.stats.rice(3)
_verify(X, Y)


def test_normal_rayleigh():
X = scipy.stats.norm(loc=-1, scale=0.5)
Y = scipy.stats.rayleigh(loc=2, scale=1.5)
_verify(X, Y)


def test_rayleigh_rician():
X = scipy.stats.rayleigh(scale=1)
Y = scipy.stats.rice(3)
_verify(X, Y)


def _verify(X, Y):
# Empirically compute the distribution
z = X.rvs(250_000) * Y.rvs(250_000)
hist, bins = np.histogram(z, bins=51, density=True)
x = bins[1:] - np.diff(bins) / 2

# Numerically compute the distribution, only do so over the histogram bins (for speed)
Z = sdr.multiply_distributions(X, Y, x)

if False:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(x, X.pdf(x), label="X")
plt.plot(x, Y.pdf(x), label="Y")
plt.plot(x, Z.pdf(x), label="X * Y")
plt.hist(z, bins=51, cumulative=False, density=True, histtype="step", label="X * Y empirical")
plt.legend()
plt.xlabel("Random variable")
plt.ylabel("Probability density")
plt.title("Product of two distributions")
plt.show()

assert np.allclose(Z.pdf(x), hist, atol=np.max(hist) * 0.1)
Loading