diff --git a/README.md b/README.md index afba40198..07281acd5 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,8 @@ View all available classes and functions in the [API Reference](https://mhostett Apply additive white Gaussian noise (AWGN), frequency offset, sample rate offset, IQ imbalance. - **Link budgets**: BSC, BEC, AWGN, and BI-AWGN channel capacity, Shannon's limit on $E_b/N_0$ and $S/N$, free-space path loss, antenna gain. -- **Miscellaneous**: Packing and unpacking binary data, hexdump of binary data. +- **Miscellaneous**: Numerical calculation of PDF of sum and product of random variables. Packing and unpacking binary + data, hexdump of binary data. - **Plotting**: Time-domain, raster, correlation, stem, discrete Fourier transform (DFT), discrete-time Fourier transform (DTFT), periodogram, spectrogram, constellation, symbol map, eye diagram, phase tree, bit error rate (BER), symbol error rate (SER), probability of detection, receiver operating characteristic (ROC), diff --git a/docs/index.rst b/docs/index.rst index b0e8b7a8f..9e1b146db 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -79,7 +79,8 @@ View all available classes and functions in the `API Reference npt.NDArray[np.float64]: Computes the SNR improvement by coherent integration. Arguments: - time_bandwidth: The time-bandwidth product $T_C B_n$ in seconds-Hz (unitless). If the noise bandwidth equals + time_bandwidth: The time-bandwidth product $T_c B_n$ in seconds-Hz (unitless). If the noise bandwidth equals the sample rate, the argument equals the number of samples $N_c$ to coherently integrate. Returns: @@ -36,7 +36,7 @@ def coherent_gain(time_bandwidth: npt.ArrayLike) -> npt.NDArray[np.float64]: The coherent integration gain is the time-bandwidth product - $$G_c = 10 \log_{10} (T_C B_n) .$$ + $$G_c = 10 \log_{10} (T_c B_n) .$$ If the noise bandwidth equals the sample rate, the coherent gain is simply @@ -86,8 +86,8 @@ def coherent_gain_loss( Computes the coherent gain loss (CGL) given a time or frequency offset. Arguments: - time: The coherent integration time $T_C$ or time offset in $\Delta t$ in seconds. - freq: The frequency offset $\Delta f$ or signal bandwidth $B_C$ in Hz. + time: The coherent integration time $T_c$ or time offset in $\Delta t$ in seconds. + freq: The frequency offset $\Delta f$ or signal bandwidth $B_c$ in Hz. Returns: The coherent gain loss (CGL) in dB. @@ -96,7 +96,7 @@ def coherent_gain_loss( Coherent gain loss is the reduction in SNR due to time or frequency offsets during coherent integration. These losses are similar to scalloping loss. - The coherent gain loss of a signal integrated for $T_C$ seconds with a frequency offset of $\Delta f$ Hz is + The coherent gain loss of a signal integrated for $T_c$ seconds with a frequency offset of $\Delta f$ Hz is $$\text{CGL} = -10 \log_{10} \left( \text{sinc}^2 \left( T_c \Delta f \right) \right) ,$$ @@ -104,10 +104,10 @@ def coherent_gain_loss( $$\text{sinc}(x) = \frac{\sin(\pi x)}{\pi x} .$$ - The coherent gain loss of a signal with bandwidth $B_C$ Hz with a detection time offset of $\Delta t$ seconds + The coherent gain loss of a signal with bandwidth $B_c$ Hz with a detection time offset of $\Delta t$ seconds is - $$\text{CGL} = -10 \log_{10} \left( \text{sinc}^2 \left( \Delta t B_C \right) \right) .$$ + $$\text{CGL} = -10 \log_{10} \left( \text{sinc}^2 \left( \Delta t B_c \right) \right) .$$ Examples: Compute the coherent gain loss for an integration time of 1 ms and a frequency offset of 235 Hz. diff --git a/src/sdr/_detection/_theory.py b/src/sdr/_detection/_theory.py index cab4ab9fb..77399eb29 100644 --- a/src/sdr/_detection/_theory.py +++ b/src/sdr/_detection/_theory.py @@ -11,6 +11,7 @@ import scipy.stats from typing_extensions import Literal +from .._conversion import db as to_db from .._conversion import linear from .._helper import ( convert_output, @@ -21,6 +22,7 @@ verify_not_specified, verify_scalar, ) +from .._probability import sum_distribution @export @@ -242,7 +244,7 @@ def _h0( h0 = scipy.stats.norm(0, np.sqrt(sigma2_per)) elif detector == "linear": h0 = scipy.stats.chi(nu, scale=np.sqrt(sigma2_per)) - h0 = _sum_distribution(h0, n_nc) + h0 = sum_distribution(h0, n_nc) elif detector == "square-law": h0 = scipy.stats.chi2(nu * n_nc, scale=sigma2_per) @@ -488,58 +490,13 @@ def _h1( else: # Folded normal distribution has 1 degree of freedom h1 = scipy.stats.foldnorm(np.sqrt(lambda_), scale=np.sqrt(sigma2_per)) - h1 = _sum_distribution(h1, n_nc) + h1 = sum_distribution(h1, n_nc) elif detector == "square-law": h1 = scipy.stats.ncx2(nu * n_nc, lambda_ * n_nc, scale=sigma2_per) return h1 -def _sum_distribution( - dist: scipy.stats.rv_continuous, n_nc: int -) -> scipy.stats.rv_histogram | scipy.stats.rv_continuous: - r""" - Sums a distribution `n_nc` times. - - Arguments: - dist: The distribution to sum. - n_nc: The number of times to sum the distribution. - - Returns: - The summed distribution. - """ - if n_nc == 1: - return dist - - # Determine mean and standard deviation of base distribution - mu, sigma2 = dist.stats() - sigma = np.sqrt(sigma2) - - # NOTE: I was only able to get this to work with x starting at 0. When the x axis start below zero, - # I couldn't get the correct offset for the convolved x axis. - - # Compute the PDF of the base distribution for 10 standard deviations about the mean - pdf_x = np.linspace(0, mu + 10 * sigma, 1_001) - pdf_y = dist.pdf(pdf_x) - dx = np.mean(np.diff(pdf_x)) - - # The PDF of the sum of n_nc independent random variables is the convolution of the PDF of the base distribution. - # This is efficiently computed in the frequency domain by exponentiating the FFT. The FFT must be zero-padded - # enough that the circular convolutions do not wrap around. - n_fft = scipy.fft.next_fast_len(pdf_y.size * n_nc) - Y = np.fft.fft(pdf_y, n_fft) - Y = Y**n_nc - y = np.fft.ifft(Y).real - y /= y.sum() * dx - x = np.arange(y.size) * dx + pdf_x[0] - - # Adjust the histograms bins to be on either side of each point. So there is one extra point added. - x = np.append(x, x[-1] + dx) - x -= dx / 2 - - return scipy.stats.rv_histogram((y, x)) - - @export def p_d( snr: npt.ArrayLike, @@ -915,6 +872,82 @@ def _calculate(p_fa, sigma2): return convert_output(threshold) +@export +def threshold_factor( + p_fa: npt.ArrayLike, + detector: Literal["coherent", "linear", "square-law"] = "square-law", + complex: bool = True, + n_c: int = 1, + n_nc: int | None = None, + db: bool = False, +) -> npt.NDArray[np.float64]: + r""" + Computes the theoretical detection threshold factor $\alpha$. + + Arguments: + p_fa: The desired probability of false alarm $P_{fa}$ in $(0, 1)$. + detector: The detector type. + + - `"coherent"`: A coherent detector, $$T(x) = \mathrm{Re}\left\{\sum_{i=0}^{N_c-1} x[n-i]\right\} .$$ + - `"linear"`: A linear detector, $$T(x) = \sum_{j=0}^{N_{nc}-1}\left|\sum_{i=0}^{N_c-1} x[n-i-jN_c]\right| .$$ + - `"square-law"`: A square-law detector, $$T(x) = \sum_{j=0}^{N_{nc}-1}\left|\sum_{i=0}^{N_c-1} x[n-i-jN_c]\right|^2 .$$ + + complex: Indicates whether the input signal is real or complex. This affects how the SNR is converted + to noise variance. + n_c: The number of samples to coherently integrate $N_c$. + n_nc: The number of samples to non-coherently integrate $N_{nc}$. Non-coherent integration is only allowable + for linear and square-law detectors. + db: Indicates whether to return the detection threshold $\alpha$ in dB. + + Returns: + The detection threshold factor $\alpha$. + + Notes: + The detection threshold factor $\alpha$ is defined as the ratio of the detection threshold $\gamma$ to the + mean of the detector output under the null hypothesis. This is true for linear and square-law detectors. + + $$\alpha = \frac{\gamma}{\frac{1}{N} \sum_{i=1}^{N}\{T(x[i]) \mid \mathcal{H}_0\}}$$ + + For coherent detectors, the detection threshold factor $\alpha$ is defined as the ratio of the detection + threshold $\gamma$ to the mean of the square of the detector output under the null hypothesis. + This is required because the mean of the coherent detector output is zero. + + $$\alpha = \frac{\gamma}{\frac{1}{N} \sum_{i=1}^{N}\{T(x[i])^2 \mid \mathcal{H}_0\}}$$ + + Examples: + .. ipython:: python + + p_fa = np.logspace(-16, -1, 101) # Probability of false alarm + + @savefig sdr_threshold_factor_1.png + plt.figure(); \ + plt.semilogx(p_fa, sdr.threshold_factor(p_fa, detector="coherent"), label="Coherent"); \ + plt.semilogx(p_fa, sdr.threshold_factor(p_fa, detector="linear"), label="Linear"); \ + plt.semilogx(p_fa, sdr.threshold_factor(p_fa, detector="square-law"), label="Square-Law"); \ + plt.xlabel("Probability of false alarm, $P_{fa}$"); \ + plt.ylabel(r"Detection threshold factor, $\alpha$"); \ + plt.legend(title="Detector"); \ + plt.title("Detection threshold factor across false alarm rate"); + + Group: + detection-theory + """ + sigma2 = 1 + gamma = threshold(p_fa, sigma2, detector, complex, n_c, n_nc) + null_hypothesis = h0(sigma2, detector, complex, n_c, n_nc) + + if detector == "coherent": + # Since mean is zero, the variance is equivalent to the mean of the square + alpha = gamma / null_hypothesis.var() + else: + alpha = gamma / null_hypothesis.mean() + + if db: + alpha = to_db(alpha) + + return alpha + + @export def min_snr( p_d: npt.ArrayLike, diff --git a/src/sdr/_filter/_applications.py b/src/sdr/_filter/_applications.py index ec9d5cc93..2e8a70282 100644 --- a/src/sdr/_filter/_applications.py +++ b/src/sdr/_filter/_applications.py @@ -184,7 +184,7 @@ def __init__( window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. If `None`, no window is applied. streaming: Indicates whether to use streaming mode. In streaming mode, previous inputs are - preserved between calls to :meth:`~sdr.Differentiator.__call__()`. + preserved between calls to :meth:`~sdr.Differentiator.__call__`. Examples: See the :ref:`fir-filters` example. @@ -198,8 +198,7 @@ def __init__( h[order // 2] = 0 if window is not None: - h_win = scipy.signal.windows.get_window(window, order + 1, fftbins=False) - h *= h_win + h *= scipy.signal.windows.get_window(window, order + 1, fftbins=False) super().__init__(h, streaming=streaming) diff --git a/src/sdr/_filter/_delay.py b/src/sdr/_filter/_delay.py index a856c70db..e672e2655 100644 --- a/src/sdr/_filter/_delay.py +++ b/src/sdr/_filter/_delay.py @@ -13,7 +13,7 @@ from ._fir import FIR -def _ideal_frac_delay(length: int, delay: float) -> npt.NDArray[np.float64]: +def _ideal_fractional_delay(length: int, delay: float) -> npt.NDArray[np.float64]: """ Returns the ideal fractional delay filter impulse response. """ @@ -23,7 +23,7 @@ def _ideal_frac_delay(length: int, delay: float) -> npt.NDArray[np.float64]: @export -def design_frac_delay_fir( +def fractional_delay_fir( length: int, delay: float, ) -> npt.NDArray[np.float64]: @@ -50,18 +50,18 @@ def design_frac_delay_fir( .. ipython:: python - h_8 = sdr.design_frac_delay_fir(8, 0.25) + h_8 = sdr.fractional_delay_fir(8, 0.25) - @savefig sdr_design_frac_delay_fir_1.png + @savefig sdr_fractional_delay_fir_1.png plt.figure(); \ sdr.plot.impulse_response(h_8); - @savefig sdr_design_frac_delay_fir_2.png + @savefig sdr_fractional_delay_fir_2.png plt.figure(); \ sdr.plot.magnitude_response(h_8); \ plt.ylim(-4, 1); - @savefig sdr_design_frac_delay_fir_3.png + @savefig sdr_fractional_delay_fir_3.png plt.figure(); \ sdr.plot.group_delay(h_8); @@ -69,11 +69,11 @@ def design_frac_delay_fir( .. ipython:: python - h_16 = sdr.design_frac_delay_fir(16, 0.25); \ - h_32 = sdr.design_frac_delay_fir(32, 0.25); \ - h_64 = sdr.design_frac_delay_fir(64, 0.25) + h_16 = sdr.fractional_delay_fir(16, 0.25); \ + h_32 = sdr.fractional_delay_fir(32, 0.25); \ + h_64 = sdr.fractional_delay_fir(64, 0.25) - @savefig sdr_design_frac_delay_fir_4.png + @savefig sdr_fractional_delay_fir_4.png plt.figure(); \ sdr.plot.magnitude_response(h_8, label="Length 8"); \ sdr.plot.magnitude_response(h_16, label="Length 16"); \ @@ -81,7 +81,7 @@ def design_frac_delay_fir( sdr.plot.magnitude_response(h_64, label="Length 64"); \ plt.ylim(-4, 1); - @savefig sdr_design_frac_delay_fir_5.png + @savefig sdr_fractional_delay_fir_5.png plt.figure(); \ sdr.plot.group_delay(h_8, label="Length 8"); \ sdr.plot.group_delay(h_16, label="Length 16"); \ @@ -95,7 +95,7 @@ def design_frac_delay_fir( verify_scalar(delay, float=True, inclusive_min=0, inclusive_max=1) N = length - (length % 2) # The length guaranteed to be even - h_ideal = _ideal_frac_delay(N, delay) + h_ideal = _ideal_fractional_delay(N, delay) if N == 2: beta = 0 @@ -181,7 +181,7 @@ def __init__( verify_scalar(length, int=True, inclusive_min=2) verify_scalar(delay, float=True, inclusive_min=0, inclusive_max=1) - h = design_frac_delay_fir(length, delay) + h = fractional_delay_fir(length, delay) super().__init__(h) self._delay = length // 2 - 1 + delay diff --git a/src/sdr/_filter/_design_fir.py b/src/sdr/_filter/_design_fir.py index 8b702f370..acfdc0b86 100644 --- a/src/sdr/_filter/_design_fir.py +++ b/src/sdr/_filter/_design_fir.py @@ -71,50 +71,11 @@ def _ideal_bandstop(order: int, center_freq: float, bandwidth: float) -> npt.NDA return h_ideal -# TODO: Replace with scipy.signal.windows.get_window() -def _window( - order: int, - window: Literal["hamming", "hann", "blackman", "blackman-harris", "chebyshev", "kaiser"] - | npt.ArrayLike - | None = None, - atten: float = 60, -) -> npt.NDArray[np.float64]: - if window is None: - h_window = np.ones(order + 1) - elif isinstance(window, str): - if window == "hamming": - h_window = scipy.signal.windows.hamming(order + 1) - elif window == "hann": - h_window = scipy.signal.windows.hann(order + 1) - elif window == "blackman": - h_window = scipy.signal.windows.blackman(order + 1) - elif window == "blackman-harris": - h_window = scipy.signal.windows.blackmanharris(order + 1) - elif window == "chebyshev": - h_window = scipy.signal.windows.chebwin(order + 1, at=atten) - elif window == "kaiser": - beta = scipy.signal.kaiser_beta(atten) - h_window = scipy.signal.windows.kaiser(order + 1, beta=beta) - else: - raise ValueError( - f"Argument 'window' must be in ['hamming', 'hann', 'blackman', 'blackman-harris', 'chebyshev', 'kaiser'], not {window!r}." - ) - else: - h_window = np.asarray(window) - if not h_window.shape == (order + 1,): - raise ValueError(f"Argument 'window' must be a length-{order + 1} vector, not {h_window.shape}.") - - return h_window - - @export -def design_lowpass_fir( +def lowpass_fir( order: int, cutoff_freq: float, - window: None - | Literal["hamming", "hann", "blackman", "blackman-harris", "chebyshev", "kaiser"] - | npt.ArrayLike = "hamming", - atten: float = 60, + window: str | float | tuple | None = "hamming", ) -> npt.NDArray[np.float64]: r""" Designs a lowpass FIR filter impulse response $h[n]$ using the window method. @@ -122,18 +83,8 @@ def design_lowpass_fir( Arguments: order: The filter order $N$. Must be even. cutoff_freq: The cutoff frequency $f_c$, normalized to the Nyquist frequency $f_s / 2$. - window: The time-domain window to use. - - - `None`: No windowing. Equivalently, a length-$N + 1$ vector of ones. - - `"hamming"`: Hamming window, see :func:`scipy.signal.windows.hamming`. - - `"hann"`: Hann window, see :func:`scipy.signal.windows.hann`. - - `"blackman"`: Blackman window, see :func:`scipy.signal.windows.blackman`. - - `"blackman-harris"`: Blackman-Harris window, see :func:`scipy.signal.windows.blackmanharris`. - - `"chebyshev"`: Chebyshev window, see :func:`scipy.signal.windows.chebwin`. - - `"kaiser"`: Kaiser window, see :func:`scipy.signal.windows.kaiser`. - - `npt.ArrayLike`: A custom window. Must be a length-$N + 1$ vector. - - atten: The sidelobe attenuation in dB. Only used if `window` is `"chebyshev"` or `"kaiser"`. + window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. + If `None`, no window is applied. Returns: The filter impulse response $h[n]$ with length $N + 1$. The center of the passband has 0 dB gain. @@ -146,13 +97,13 @@ def design_lowpass_fir( .. ipython:: python - h_hamming = sdr.design_lowpass_fir(100, 0.2, window="hamming") + h_hamming = sdr.lowpass_fir(100, 0.2, window="hamming") - @savefig sdr_design_lowpass_fir_1.png + @savefig sdr_lowpass_fir_1.png plt.figure(); \ sdr.plot.impulse_response(h_hamming); - @savefig sdr_design_lowpass_fir_2.png + @savefig sdr_lowpass_fir_2.png plt.figure(); \ sdr.plot.magnitude_response(h_hamming); @@ -160,13 +111,13 @@ def design_lowpass_fir( .. ipython:: python - h_hann = sdr.design_lowpass_fir(100, 0.2, window="hann"); \ - h_blackman = sdr.design_lowpass_fir(100, 0.2, window="blackman"); \ - h_blackman_harris = sdr.design_lowpass_fir(100, 0.2, window="blackman-harris"); \ - h_chebyshev = sdr.design_lowpass_fir(100, 0.2, window="chebyshev"); \ - h_kaiser = sdr.design_lowpass_fir(100, 0.2, window="kaiser") + h_hann = sdr.lowpass_fir(100, 0.2, window="hann"); \ + h_blackman = sdr.lowpass_fir(100, 0.2, window="blackman"); \ + h_blackman_harris = sdr.lowpass_fir(100, 0.2, window="blackmanharris"); \ + h_chebyshev = sdr.lowpass_fir(100, 0.2, window=("chebwin", 60)); \ + h_kaiser = sdr.lowpass_fir(100, 0.2, window=("kaiser", 0.5)) - @savefig sdr_design_lowpass_fir_3.png + @savefig sdr_lowpass_fir_3.png plt.figure(); \ sdr.plot.magnitude_response(h_hamming, label="Hamming"); \ sdr.plot.magnitude_response(h_hann, label="Hann"); \ @@ -181,24 +132,20 @@ def design_lowpass_fir( """ verify_scalar(order, int=True, even=True) verify_scalar(cutoff_freq, float=True, inclusive_min=0, inclusive_max=1) - verify_scalar(atten, float=True, non_negative=True) - h_ideal = _ideal_lowpass(order, cutoff_freq) - h_window = _window(order, window, atten=atten) - h = h_ideal * h_window + h = _ideal_lowpass(order, cutoff_freq) + if window is not None: + h *= scipy.signal.windows.get_window(window, h.size, fftbins=False) h = _normalize_passband(h, 0) return h @export -def design_highpass_fir( +def highpass_fir( order: int, cutoff_freq: float, - window: None - | Literal["hamming", "hann", "blackman", "blackman-harris", "chebyshev", "kaiser"] - | npt.ArrayLike = "hamming", - atten: float = 60, + window: str | float | tuple | None = "hamming", ) -> npt.NDArray[np.float64]: r""" Designs a highpass FIR filter impulse response $h[n]$ using the window method. @@ -206,18 +153,8 @@ def design_highpass_fir( Arguments: order: The filter order $N$. Must be even. cutoff_freq: The cutoff frequency $f_c$, normalized to the Nyquist frequency $f_s / 2$. - window: The time-domain window to use. - - - `None`: No windowing. Equivalently, a length-$N + 1$ vector of ones. - - `"hamming"`: Hamming window, see :func:`scipy.signal.windows.hamming`. - - `"hann"`: Hann window, see :func:`scipy.signal.windows.hann`. - - `"blackman"`: Blackman window, see :func:`scipy.signal.windows.blackman`. - - `"blackman-harris"`: Blackman-Harris window, see :func:`scipy.signal.windows.blackmanharris`. - - `"chebyshev"`: Chebyshev window, see :func:`scipy.signal.windows.chebwin`. - - `"kaiser"`: Kaiser window, see :func:`scipy.signal.windows.kaiser`. - - `npt.ArrayLike`: A custom window. Must be a length-$N + 1$ vector. - - atten: The sidelobe attenuation in dB. Only used if `window` is `"chebyshev"` or `"kaiser"`. + window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. + If `None`, no window is applied. Returns: The filter impulse response $h[n]$ with length $N + 1$. The center of the passband has 0 dB gain. @@ -230,13 +167,13 @@ def design_highpass_fir( .. ipython:: python - h_hamming = sdr.design_highpass_fir(100, 0.7, window="hamming") + h_hamming = sdr.highpass_fir(100, 0.7, window="hamming") - @savefig sdr_design_highpass_fir_1.png + @savefig sdr_highpass_fir_1.png plt.figure(); \ sdr.plot.impulse_response(h_hamming); - @savefig sdr_design_highpass_fir_2.png + @savefig sdr_highpass_fir_2.png plt.figure(); \ sdr.plot.magnitude_response(h_hamming); @@ -244,13 +181,13 @@ def design_highpass_fir( .. ipython:: python - h_hann = sdr.design_highpass_fir(100, 0.7, window="hann"); \ - h_blackman = sdr.design_highpass_fir(100, 0.7, window="blackman"); \ - h_blackman_harris = sdr.design_highpass_fir(100, 0.7, window="blackman-harris"); \ - h_chebyshev = sdr.design_highpass_fir(100, 0.7, window="chebyshev"); \ - h_kaiser = sdr.design_highpass_fir(100, 0.7, window="kaiser") + h_hann = sdr.highpass_fir(100, 0.7, window="hann"); \ + h_blackman = sdr.highpass_fir(100, 0.7, window="blackman"); \ + h_blackman_harris = sdr.highpass_fir(100, 0.7, window="blackmanharris"); \ + h_chebyshev = sdr.highpass_fir(100, 0.7, window=("chebwin", 60)); \ + h_kaiser = sdr.highpass_fir(100, 0.7, window=("kaiser", 0.5)) - @savefig sdr_design_highpass_fir_3.png + @savefig sdr_highpass_fir_3.png plt.figure(); \ sdr.plot.magnitude_response(h_hamming, label="Hamming"); \ sdr.plot.magnitude_response(h_hann, label="Hann"); \ @@ -265,25 +202,21 @@ def design_highpass_fir( """ verify_scalar(order, int=True, even=True) verify_scalar(cutoff_freq, float=True, inclusive_min=0, inclusive_max=1) - verify_scalar(atten, float=True, non_negative=True) - h_ideal = _ideal_highpass(order, cutoff_freq) - h_window = _window(order, window, atten=atten) - h = h_ideal * h_window + h = _ideal_highpass(order, cutoff_freq) + if window is not None: + h *= scipy.signal.windows.get_window(window, h.size, fftbins=False) h = _normalize_passband(h, 1) return h @export -def design_bandpass_fir( +def bandpass_fir( order: int, center_freq: float, bandwidth: float, - window: None - | Literal["hamming", "hann", "blackman", "blackman-harris", "chebyshev", "kaiser"] - | npt.ArrayLike = "hamming", - atten: float = 60, + window: str | float | tuple | None = "hamming", ) -> npt.NDArray[np.float64]: r""" Designs a bandpass FIR filter impulse response $h[n]$ using the window method. @@ -292,18 +225,8 @@ def design_bandpass_fir( order: The filter order $N$. Must be even. center_freq: The center frequency $f_{center}$, normalized to the Nyquist frequency $f_s / 2$. bandwidth: The two-sided bandwidth about $f_{center}$, normalized to the Nyquist frequency $f_s / 2$. - window: The time-domain window to use. - - - `None`: No windowing. Equivalently, a length-$N + 1$ vector of ones. - - `"hamming"`: Hamming window, see :func:`scipy.signal.windows.hamming`. - - `"hann"`: Hann window, see :func:`scipy.signal.windows.hann`. - - `"blackman"`: Blackman window, see :func:`scipy.signal.windows.blackman`. - - `"blackman-harris"`: Blackman-Harris window, see :func:`scipy.signal.windows.blackmanharris`. - - `"chebyshev"`: Chebyshev window, see :func:`scipy.signal.windows.chebwin`. - - `"kaiser"`: Kaiser window, see :func:`scipy.signal.windows.kaiser`. - - `npt.ArrayLike`: A custom window. Must be a length-$N + 1$ vector. - - atten: The sidelobe attenuation in dB. Only used if `window` is `"chebyshev"` or `"kaiser"`. + window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. + If `None`, no window is applied. Returns: The filter impulse response $h[n]$ with length $N + 1$. The center of the passband has 0 dB gain. @@ -317,13 +240,13 @@ def design_bandpass_fir( .. ipython:: python - h_hamming = sdr.design_bandpass_fir(100, 0.4, 0.1, window="hamming") + h_hamming = sdr.bandpass_fir(100, 0.4, 0.1, window="hamming") - @savefig sdr_design_bandpass_fir_1.png + @savefig sdr_bandpass_fir_1.png plt.figure(); \ sdr.plot.impulse_response(h_hamming); - @savefig sdr_design_bandpass_fir_2.png + @savefig sdr_bandpass_fir_2.png plt.figure(); \ sdr.plot.magnitude_response(h_hamming); @@ -331,13 +254,13 @@ def design_bandpass_fir( .. ipython:: python - h_hann = sdr.design_bandpass_fir(100, 0.4, 0.1, window="hann"); \ - h_blackman = sdr.design_bandpass_fir(100, 0.4, 0.1, window="blackman"); \ - h_blackman_harris = sdr.design_bandpass_fir(100, 0.4, 0.1, window="blackman-harris"); \ - h_chebyshev = sdr.design_bandpass_fir(100, 0.4, 0.1, window="chebyshev"); \ - h_kaiser = sdr.design_bandpass_fir(100, 0.4, 0.1, window="kaiser") + h_hann = sdr.bandpass_fir(100, 0.4, 0.1, window="hann"); \ + h_blackman = sdr.bandpass_fir(100, 0.4, 0.1, window="blackman"); \ + h_blackman_harris = sdr.bandpass_fir(100, 0.4, 0.1, window="blackmanharris"); \ + h_chebyshev = sdr.bandpass_fir(100, 0.4, 0.1, window=("chebwin", 60)); \ + h_kaiser = sdr.bandpass_fir(100, 0.4, 0.1, window=("kaiser", 0.5)) - @savefig sdr_design_bandpass_fir_3.png + @savefig sdr_bandpass_fir_3.png plt.figure(); \ sdr.plot.magnitude_response(h_hamming, label="Hamming"); \ sdr.plot.magnitude_response(h_hann, label="Hann"); \ @@ -353,25 +276,21 @@ def design_bandpass_fir( verify_scalar(order, int=True, even=True) verify_scalar(center_freq, float=True, inclusive_min=0, inclusive_max=1) verify_scalar(bandwidth, float=True, inclusive_min=0, inclusive_max=2 * min(center_freq, 1 - center_freq)) - verify_scalar(atten, float=True, non_negative=True) - h_ideal = _ideal_bandpass(order, center_freq, bandwidth) - h_window = _window(order, window, atten=atten) - h = h_ideal * h_window + h = _ideal_bandpass(order, center_freq, bandwidth) + if window is not None: + h *= scipy.signal.windows.get_window(window, h.size, fftbins=False) h = _normalize_passband(h, center_freq) return h @export -def design_bandstop_fir( +def bandstop_fir( order: int, center_freq: float, bandwidth: float, - window: None - | Literal["hamming", "hann", "blackman", "blackman-harris", "chebyshev", "kaiser"] - | npt.ArrayLike = "hamming", - atten: float = 60, + window: str | float | tuple | None = "hamming", ) -> npt.NDArray[np.float64]: r""" Designs a bandstop FIR filter impulse response $h[n]$ using the window method. @@ -380,18 +299,8 @@ def design_bandstop_fir( order: The filter order $N$. Must be even. center_freq: The center frequency $f_{center}$, normalized to the Nyquist frequency $f_s / 2$. bandwidth: The two-sided bandwidth about $f_{center}$, normalized to the Nyquist frequency $f_s / 2$. - window: The time-domain window to use. - - - `None`: No windowing. Equivalently, a length-$N + 1$ vector of ones. - - `"hamming"`: Hamming window, see :func:`scipy.signal.windows.hamming`. - - `"hann"`: Hann window, see :func:`scipy.signal.windows.hann`. - - `"blackman"`: Blackman window, see :func:`scipy.signal.windows.blackman`. - - `"blackman-harris"`: Blackman-Harris window, see :func:`scipy.signal.windows.blackmanharris`. - - `"chebyshev"`: Chebyshev window, see :func:`scipy.signal.windows.chebwin`. - - `"kaiser"`: Kaiser window, see :func:`scipy.signal.windows.kaiser`. - - `npt.ArrayLike`: A custom window. Must be a length-$N + 1$ vector. - - atten: The sidelobe attenuation in dB. Only used if `window` is `"chebyshev"` or `"kaiser"`. + window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. + If `None`, no window is applied. Returns: The filter impulse response $h[n]$ with length $N + 1$. The center of the larger passband has 0 dB gain. @@ -405,13 +314,13 @@ def design_bandstop_fir( .. ipython:: python - h_hamming = sdr.design_bandstop_fir(100, 0.4, 0.75, window="hamming") + h_hamming = sdr.bandstop_fir(100, 0.4, 0.75, window="hamming") - @savefig sdr_design_bandstop_fir_1.png + @savefig sdr_bandstop_fir_1.png plt.figure(); \ sdr.plot.impulse_response(h_hamming); - @savefig sdr_design_bandstop_fir_2.png + @savefig sdr_bandstop_fir_2.png plt.figure(); \ sdr.plot.magnitude_response(h_hamming); @@ -419,13 +328,13 @@ def design_bandstop_fir( .. ipython:: python - h_hann = sdr.design_bandstop_fir(100, 0.4, 0.75, window="hann"); \ - h_blackman = sdr.design_bandstop_fir(100, 0.4, 0.75, window="blackman"); \ - h_blackman_harris = sdr.design_bandstop_fir(100, 0.4, 0.75, window="blackman-harris"); \ - h_chebyshev = sdr.design_bandstop_fir(100, 0.4, 0.75, window="chebyshev"); \ - h_kaiser = sdr.design_bandstop_fir(100, 0.4, 0.75, window="kaiser") + h_hann = sdr.bandstop_fir(100, 0.4, 0.75, window="hann"); \ + h_blackman = sdr.bandstop_fir(100, 0.4, 0.75, window="blackman"); \ + h_blackman_harris = sdr.bandstop_fir(100, 0.4, 0.75, window="blackmanharris"); \ + h_chebyshev = sdr.bandstop_fir(100, 0.4, 0.75, window=("chebwin", 60)); \ + h_kaiser = sdr.bandstop_fir(100, 0.4, 0.75, window=("kaiser", 0.5)) - @savefig sdr_design_bandstop_fir_3.png + @savefig sdr_bandstop_fir_3.png plt.figure(); \ sdr.plot.magnitude_response(h_hamming, label="Hamming"); \ sdr.plot.magnitude_response(h_hann, label="Hann"); \ @@ -441,11 +350,10 @@ def design_bandstop_fir( verify_scalar(order, int=True, even=True) verify_scalar(center_freq, float=True, inclusive_min=0, inclusive_max=1) verify_scalar(bandwidth, float=True, inclusive_min=0, inclusive_max=2 * min(center_freq, 1 - center_freq)) - verify_scalar(atten, float=True, non_negative=True) - h_ideal = _ideal_bandstop(order, center_freq, bandwidth) - h_window = _window(order, window, atten=atten) - h = h_ideal * h_window + h = _ideal_bandstop(order, center_freq, bandwidth) + if window is not None: + h *= scipy.signal.windows.get_window(window, h.size, fftbins=False) if center_freq > 0.5: h = _normalize_passband(h, 0) else: diff --git a/src/sdr/_filter/_design_multirate.py b/src/sdr/_filter/_design_multirate.py index ed0be95bf..3269874e2 100644 --- a/src/sdr/_filter/_design_multirate.py +++ b/src/sdr/_filter/_design_multirate.py @@ -14,7 +14,7 @@ @export -def design_multirate_fir( +def multirate_fir( interpolation: int, decimation: int = 1, polyphase_order: int = 23, @@ -41,13 +41,13 @@ def design_multirate_fir( .. ipython:: python - h = sdr.design_multirate_fir(11, 3) + h = sdr.multirate_fir(11, 3) - @savefig sdr_design_multirate_fir_1.png + @savefig sdr_multirate_fir_1.png plt.figure(); \ sdr.plot.impulse_response(h); - @savefig sdr_design_multirate_fir_2.png + @savefig sdr_multirate_fir_2.png plt.figure(); \ sdr.plot.magnitude_response(h); @@ -96,7 +96,7 @@ def design_multirate_fir( return h -def design_multirate_fir_linear(rate: int) -> npt.NDArray[np.float64]: +def multirate_fir_linear(rate: int) -> npt.NDArray[np.float64]: r""" The multirate filter is designed to linearly interpolate between samples. @@ -109,7 +109,7 @@ def design_multirate_fir_linear(rate: int) -> npt.NDArray[np.float64]: return h -def design_multirate_fir_linear_matlab(rate: int) -> npt.NDArray[np.float64]: +def multirate_fir_linear_matlab(rate: int) -> npt.NDArray[np.float64]: r""" The multirate filter is designed to linearly interpolate between samples. @@ -122,7 +122,7 @@ def design_multirate_fir_linear_matlab(rate: int) -> npt.NDArray[np.float64]: return h -def design_multirate_fir_zoh(rate: int) -> npt.NDArray[np.float64]: +def multirate_fir_zoh(rate: int) -> npt.NDArray[np.float64]: """ The multirate filter is designed to be a zero-order hold. diff --git a/src/sdr/_filter/_fir.py b/src/sdr/_filter/_fir.py index 3d6e35b74..f0bce8c56 100644 --- a/src/sdr/_filter/_fir.py +++ b/src/sdr/_filter/_fir.py @@ -8,6 +8,7 @@ import numpy as np import numpy.typing as npt +import scipy.integrate import scipy.signal from typing_extensions import Literal @@ -323,7 +324,7 @@ def frequency_response( Examples: .. ipython:: python - h = sdr.design_lowpass_fir(100, 0.2, window="hamming"); \ + h = sdr.lowpass_fir(100, 0.2, window="hamming"); \ fir = sdr.FIR(h) Compute the frequency response at 1024 evenly spaced frequencies. @@ -418,6 +419,30 @@ def phase_delay(self, sample_rate: float = 1.0, N: int = 1024) -> tuple[npt.NDAr return f, tau_phi + def noise_bandwidth(self, sample_rate: float = 1.0) -> float: + r""" + Returns the noise bandwidth $B_n$ of the FIR filter. + + Arguments: + sample_rate: The sample rate $f_s$ of the filter in samples/s. + + Returns: + The noise bandwidth of the FIR filter $B_n$ in Hz. + + Examples: + See the :ref:`fir-filters` example. + """ + verify_scalar(sample_rate, float=True, positive=True) + + # Compute the frequency response + f, H = scipy.signal.freqz(self.taps, 1, worN=2**20, whole=True, fs=sample_rate) + H = np.abs(H) ** 2 + + # Compute the noise bandwidth + B_n = scipy.integrate.simpson(y=H, x=f) / np.max(H) + + return B_n + ############################################################################## # Properties ############################################################################## diff --git a/src/sdr/_filter/_iir.py b/src/sdr/_filter/_iir.py index c9a8214be..61d1912b5 100644 --- a/src/sdr/_filter/_iir.py +++ b/src/sdr/_filter/_iir.py @@ -8,6 +8,7 @@ import numpy as np import numpy.typing as npt +import scipy.integrate import scipy.signal from typing_extensions import Self @@ -347,6 +348,30 @@ def frequency_response( else: return H + def noise_bandwidth(self, sample_rate: float = 1.0) -> float: + r""" + Returns the noise bandwidth $B_n$ of the IIR filter. + + Arguments: + sample_rate: The sample rate $f_s$ of the filter in samples/s. + + Returns: + The noise bandwidth of the IIR filter $B_n$ in Hz. + + Examples: + See the :ref:`iir-filters` example. + """ + verify_scalar(sample_rate, float=True, positive=True) + + # Compute the frequency response + f, H = scipy.signal.freqz(self.b_taps, self.a_taps, worN=2**20, whole=True, fs=sample_rate) + H = np.abs(H) ** 2 + + # Compute the noise bandwidth + B_n = scipy.integrate.simpson(y=H, x=f) / np.max(H) + + return B_n + ############################################################################## # Properties ############################################################################## diff --git a/src/sdr/_filter/_polyphase.py b/src/sdr/_filter/_polyphase.py index 2e7f0a018..9bb0871cc 100644 --- a/src/sdr/_filter/_polyphase.py +++ b/src/sdr/_filter/_polyphase.py @@ -11,10 +11,10 @@ from .._helper import convert_output, export, verify_arraylike, verify_bool, verify_literal, verify_scalar from ._design_multirate import ( - design_multirate_fir, - design_multirate_fir_linear, - design_multirate_fir_linear_matlab, - design_multirate_fir_zoh, + multirate_fir, + multirate_fir_linear, + multirate_fir_linear_matlab, + multirate_fir_zoh, polyphase_decompose, ) from ._fir import FIR @@ -640,7 +640,7 @@ def __init__( interpolation: The interpolation rate $P$. taps: The prototype filter design specification. - - `"kaiser"`: The prototype filter is designed using :func:`~sdr.design_multirate_fir()` + - `"kaiser"`: The prototype filter is designed using :func:`sdr.multirate_fir` with arguments `interpolation` and 1. - `"linear"`: The prototype filter is designed to linearly interpolate between samples. The filter coefficients are a length-$2P$ linear ramp $\frac{1}{P} [0, ..., P-1, P, P-1, ..., 1]$. @@ -670,13 +670,13 @@ def __init__( else: self._method = verify_literal(taps, ["kaiser", "linear", "linear-matlab", "zoh"]) if taps == "kaiser": - taps = design_multirate_fir(interpolation, 1, polyphase_order, atten) + taps = multirate_fir(interpolation, 1, polyphase_order, atten) elif taps == "linear": - taps = design_multirate_fir_linear(interpolation) + taps = multirate_fir_linear(interpolation) elif taps == "linear-matlab": - taps = design_multirate_fir_linear_matlab(interpolation) + taps = multirate_fir_linear_matlab(interpolation) elif taps == "zoh": - taps = design_multirate_fir_zoh(interpolation) + taps = multirate_fir_zoh(interpolation) super().__init__(interpolation, taps, input="hold", output="top-to-bottom", streaming=streaming) @@ -818,7 +818,7 @@ def __init__( decimation: The decimation rate $Q$. taps: The prototype filter design specification. - - `"kaiser"`: The prototype filter is designed using :func:`~sdr.design_multirate_fir()` + - `"kaiser"`: The prototype filter is designed using :func:`sdr.multirate_fir` with arguments 1 and `decimation`. - `npt.ArrayLike`: The prototype filter feedforward coefficients $h[n]$. @@ -836,7 +836,7 @@ def __init__( taps = verify_arraylike(taps, atleast_1d=True, ndim=1) else: self._method = verify_literal(taps, ["kaiser"]) - taps = design_multirate_fir(1, decimation, polyphase_order, atten) + taps = multirate_fir(1, decimation, polyphase_order, atten) super().__init__(decimation, taps, input="bottom-to-top", output="sum", streaming=streaming) @@ -1004,7 +1004,7 @@ def __init__( decimation: The decimation rate $Q$. taps: The prototype filter design specification. - - `"kaiser"`: The prototype filter is designed using :func:`~sdr.design_multirate_fir()` + - `"kaiser"`: The prototype filter is designed using :func:`sdr.multirate_fir` with arguments `interpolation` and `decimation`. - `"linear"`: The prototype filter is designed to linearly interpolate between samples. The filter coefficients are a length-$2P$ linear ramp $\frac{1}{P} [0, ..., P-1, P, P-1, ..., 1]$. @@ -1032,15 +1032,15 @@ def __init__( else: self._method = verify_literal(taps, ["kaiser", "linear", "linear-matlab", "zoh"]) if taps == "kaiser": - taps = design_multirate_fir(interpolation, decimation, polyphase_order, atten) + taps = multirate_fir(interpolation, decimation, polyphase_order, atten) else: verify_scalar(interpolation, int=True, exclusive_min=1) if taps == "linear": - taps = design_multirate_fir_linear(interpolation) + taps = multirate_fir_linear(interpolation) elif taps == "linear-matlab": - taps = design_multirate_fir_linear_matlab(interpolation) + taps = multirate_fir_linear_matlab(interpolation) elif taps == "zoh": - taps = design_multirate_fir_zoh(interpolation) + taps = multirate_fir_zoh(interpolation) if interpolation == 1: # PolyphaseFIR configured like Decimator @@ -1210,7 +1210,7 @@ def __init__( channels: The number of channels $C$. taps: The prototype filter design specification. - - `"kaiser"`: The prototype filter is designed using :func:`~sdr.design_multirate_fir()` + - `"kaiser"`: The prototype filter is designed using :func:`sdr.multirate_fir` with arguments 1 and `rate`. - `npt.ArrayLike`: The prototype filter feedforward coefficients $h[n]$. @@ -1228,7 +1228,7 @@ def __init__( taps = verify_arraylike(taps, atleast_1d=True, ndim=1) else: self._method = verify_literal(taps, ["kaiser"]) - taps = design_multirate_fir(1, channels, polyphase_order, atten) + taps = multirate_fir(1, channels, polyphase_order, atten) super().__init__(channels, taps, input="bottom-to-top", output="all", streaming=streaming) diff --git a/src/sdr/_link_budget/_antenna.py b/src/sdr/_link_budget/_antenna.py index e82cdeb7b..a6edb142e 100644 --- a/src/sdr/_link_budget/_antenna.py +++ b/src/sdr/_link_budget/_antenna.py @@ -91,7 +91,9 @@ def parabolic_antenna( G = (np.pi * diameter / lambda_) ** 2 * efficiency # Gain in linear units G = db(G) # Gain in dBi - theta = np.arcsin(3.83 * lambda_ / (np.pi * diameter)) # Beamwidth in radians + with np.errstate(invalid="ignore"): + # If the argument is greater than 1, arcsin returns NaN, which is fine + theta = np.arcsin(3.83 * lambda_ / (np.pi * diameter)) # Beamwidth in radians theta = np.rad2deg(theta) # Beamwidth in degrees return convert_output(G), convert_output(theta) diff --git a/src/sdr/_link_budget/_path_loss.py b/src/sdr/_link_budget/_path_loss.py index ed1b372b9..7b7f19a87 100644 --- a/src/sdr/_link_budget/_path_loss.py +++ b/src/sdr/_link_budget/_path_loss.py @@ -68,6 +68,43 @@ def free_space_path_loss( plt.xlabel('Distance (m)'); \ plt.ylabel('Free-space path loss (dB)'); + It is confusing why free-space path loss is proportional to the square of frequency. Is RF energy attenuated + more at higher frequencies? The answer is no. The reason the FSPL equation has a frequency + dependence is that it assumes omnidirectional, isotropic antennas are used at both the transmitter and + receiver. The isotropic antenna has a gain of 0 dBi. The physical size of a roughly isotropic antenna + (think dipole) is a function of frequency as well. So, as the frequency increases, the physical size of the + isotropic antenna decreases. But what if the size of the antenna was fixed across frequency, as is the case + with a parabolic dish antenna? You'll note that the gain of a parabolic dish antenna is also proportional to + the square of the frequency, see :func:`sdr.parabolic_antenna`. + + It turns out that if omnidirectional antennas are used at both the transmitter and receiver, the total path + loss increases with frequency. But if a parabolic reflector is used at one end, the total path loss is + constant across frequency. Furthermore, if a parabolic reflector is used at both ends, as is the case in + VSAT systems, the total path loss decreases with frequency. + + .. ipython:: python + + freq = np.linspace(1e6, 40e9, 1_001) + + # Free-space path loss at 1 km + fspl = sdr.free_space_path_loss(1e3, freq) + + # Isotropic antenna gain in dBi + iso = 0 + + # 1-meter diameter parabolic dish antenna gain in dBi + par = sdr.parabolic_antenna(freq, 1)[0] + + @savefig sdr_fspl_2.png + plt.figure(); \ + plt.plot(freq / 1e9, fspl - iso - iso, label="Isotropic -> Isotropic"); \ + plt.plot(freq / 1e9, fspl - iso - par, label="Isotropic -> Parabolic"); \ + plt.plot(freq / 1e9, fspl - par - par, label="Parabolic -> Parabolic"); \ + plt.legend(title="Antennas", loc="center right"); \ + plt.xlabel("Frequency (GHz), $f$"); \ + plt.ylabel("Path loss (dB)"); \ + plt.title("Path loss across center frequency"); + Group: link-budget-path-losses """ diff --git a/src/sdr/_probability.py b/src/sdr/_probability.py index 655d95715..c2e3ab5f7 100644 --- a/src/sdr/_probability.py +++ b/src/sdr/_probability.py @@ -8,7 +8,7 @@ import numpy.typing as npt import scipy.special -from ._helper import convert_output, export, verify_arraylike +from ._helper import convert_output, export, verify_arraylike, verify_scalar @export @@ -75,3 +75,340 @@ def Qinv(p: npt.ArrayLike) -> npt.NDArray[np.float64]: x = np.sqrt(2) * scipy.special.erfcinv(2 * p) return convert_output(x) + + +@export +def sum_distribution( + X: scipy.stats.rv_continuous | scipy.stats.rv_histogram, + n_terms: int, + p: float = 1e-16, +) -> scipy.stats.rv_histogram: + r""" + Numerically calculates the distribution of the sum of $n$ i.i.d. random variables $X_i$. + + Arguments: + X: The distribution of the i.i.d. random variables $X_i$. + n_terms: The number $n$ of random variables to sum. + p: The probability of exceeding the x axis, on either side, for each distribution. This is used to determine + the bounds on the x axis for the numerical convolution. Smaller values of $p$ will result in more accurate + analysis, but will require more computation. + + Returns: + The distribution of the sum $Z = X_1 + X_2 + \ldots + X_n$. + + Notes: + The PDF of the sum of $n$ independent random variables is the convolution of the PDF of the base distribution. + + $$f_{X_1 + X_2 + \ldots + X_n}(t) = (f_X * f_X * \ldots * f_X)(t)$$ + + Examples: + Compute the distribution of the sum of two normal distributions. + + .. ipython:: python + + X = scipy.stats.norm(loc=-1, scale=0.5) + n_terms = 2 + x = np.linspace(-6, 2, 1_001) + + @savefig sdr_sum_distribution_1.png + plt.figure(); \ + plt.plot(x, X.pdf(x), label="X"); \ + plt.plot(x, sdr.sum_distribution(X, n_terms).pdf(x), label="X + X"); \ + plt.hist(X.rvs((100_000, n_terms)).sum(axis=1), bins=101, density=True, histtype="step", label="X + X empirical"); \ + plt.legend(); \ + plt.xlabel("Random variable"); \ + plt.ylabel("Probability density"); \ + plt.title("Sum of two Normal distributions"); + + Compute the distribution of the sum of three Rayleigh distributions. + + .. ipython:: python + + X = scipy.stats.rayleigh(scale=1) + n_terms = 3 + x = np.linspace(0, 10, 1_001) + + @savefig sdr_sum_distribution_2.png + plt.figure(); \ + plt.plot(x, X.pdf(x), label="X"); \ + plt.plot(x, sdr.sum_distribution(X, n_terms).pdf(x), label="X + X + X"); \ + plt.hist(X.rvs((100_000, n_terms)).sum(axis=1), bins=101, density=True, histtype="step", label="X + X + X empirical"); \ + plt.legend(); \ + plt.xlabel("Random variable"); \ + plt.ylabel("Probability density"); \ + plt.title("Sum of three Rayleigh distributions"); + + Compute the distribution of the sum of four Rician distributions. + + .. ipython:: python + + X = scipy.stats.rice(2) + n_terms = 4 + x = np.linspace(0, 18, 1_001) + + @savefig sdr_sum_distribution_3.png + plt.figure(); \ + plt.plot(x, X.pdf(x), label="X"); \ + plt.plot(x, sdr.sum_distribution(X, n_terms).pdf(x), label="X + X + X + X"); \ + plt.hist(X.rvs((100_000, n_terms)).sum(axis=1), bins=101, density=True, histtype="step", label="X + X + X + X empirical"); \ + plt.legend(); \ + plt.xlabel("Random variable"); \ + plt.ylabel("Probability density"); \ + plt.title("Sum of four Rician distributions"); + + Group: + probability + """ + verify_scalar(n_terms, int=True, positive=True) + verify_scalar(p, float=True, exclusive_min=0, inclusive_max=0.1) + + if n_terms == 1: + return X + + # Determine the x axis of each distribution such that the probability of exceeding the x axis, on either side, + # is p. + x1_min, x1_max = _x_range(X, p) + x = np.linspace(x1_min, x1_max, 1_001) + dx = np.mean(np.diff(x)) + + # Compute the PDF of the base distribution + f_X = X.pdf(x) + + # The PDF of the sum of n_terms independent random variables is the convolution of the PDF of the base distribution. + # This is efficiently computed in the frequency domain by exponentiating the FFT. The FFT must be zero-padded + # enough that the circular convolutions do not wrap around. + n_fft = scipy.fft.next_fast_len(f_X.size * n_terms) + f_X_fft = np.fft.fft(f_X, n_fft) + f_X_fft = f_X_fft**n_terms + f_Y = np.fft.ifft(f_X_fft).real + f_Y /= f_Y.sum() * dx + x = np.arange(f_Y.size) * dx + x[0] * (n_terms) + + # Adjust the histograms bins to be on either side of each point. So there is one extra point added. + x = np.append(x, x[-1] + dx) + x -= dx / 2 + + return scipy.stats.rv_histogram((f_Y, x)) + + +@export +def sum_distributions( + X: scipy.stats.rv_continuous | scipy.stats.rv_histogram, + Y: scipy.stats.rv_continuous | scipy.stats.rv_histogram, + p: float = 1e-16, +) -> scipy.stats.rv_histogram: + r""" + Numerically calculates the distribution of the sum of two independent random variables $X$ and $Y$. + + Arguments: + X: The distribution of the first random variable $X$. + Y: The distribution of the second random variable $Y$. + p: The probability of exceeding the x axis, on either side, for each distribution. This is used to determine + the bounds on the x axis for the numerical convolution. Smaller values of $p$ will result in more accurate + analysis, but will require more computation. + + Returns: + The distribution of the sum $Z = X + Y$. + + Notes: + The PDF of the sum of two independent random variables is the convolution of the PDF of the two distributions. + + $$f_{X+Y}(t) = (f_X * f_Y)(t)$$ + + Examples: + Compute the distribution of the sum of two normal distributions. + + .. ipython:: python + + X = scipy.stats.norm(loc=-1, scale=0.5) + Y = scipy.stats.norm(loc=2, scale=1.5) + x = np.linspace(-5, 10, 1_001) + + @savefig sdr_sum_distributions_1.png + plt.figure(); \ + plt.plot(x, X.pdf(x), label="X"); \ + plt.plot(x, Y.pdf(x), label="Y"); \ + plt.plot(x, sdr.sum_distributions(X, Y).pdf(x), label="X + Y"); \ + plt.hist(X.rvs(100_000) + Y.rvs(100_000), bins=101, density=True, histtype="step", label="X + Y empirical"); \ + plt.legend(); \ + plt.xlabel("Random variable"); \ + plt.ylabel("Probability density"); \ + plt.title("Sum of two Normal distributions"); + + Compute the distribution of the sum of two Rayleigh distributions. + + .. ipython:: python + + X = scipy.stats.rayleigh(scale=1) + Y = scipy.stats.rayleigh(loc=1, scale=2) + x = np.linspace(0, 12, 1_001) + + @savefig sdr_sum_distributions_2.png + plt.figure(); \ + plt.plot(x, X.pdf(x), label="X"); \ + plt.plot(x, Y.pdf(x), label="Y"); \ + plt.plot(x, sdr.sum_distributions(X, Y).pdf(x), label="X + Y"); \ + plt.hist(X.rvs(100_000) + Y.rvs(100_000), bins=101, density=True, histtype="step", label="X + Y empirical"); \ + plt.legend(); \ + plt.xlabel("Random variable"); \ + plt.ylabel("Probability density"); \ + plt.title("Sum of two Rayleigh distributions"); + + Compute the distribution of the sum of two Rician distributions. + + .. ipython:: python + + X = scipy.stats.rice(2) + Y = scipy.stats.rice(3) + x = np.linspace(0, 12, 1_001) + + @savefig sdr_sum_distributions_3.png + plt.figure(); \ + plt.plot(x, X.pdf(x), label="X"); \ + plt.plot(x, Y.pdf(x), label="Y"); \ + plt.plot(x, sdr.sum_distributions(X, Y).pdf(x), label="X + Y"); \ + plt.hist(X.rvs(100_000) + Y.rvs(100_000), bins=101, density=True, histtype="step", label="X + Y empirical"); \ + plt.legend(); \ + plt.xlabel("Random variable"); \ + plt.ylabel("Probability density"); \ + plt.title("Sum of two Rician distributions"); + + Group: + probability + """ + verify_scalar(p, float=True, exclusive_min=0, inclusive_max=0.1) + + # Determine the x axis of each distribution such that the probability of exceeding the x axis, on either side, + # is p. + x1_min, x1_max = _x_range(X, p) + x2_min, x2_max = _x_range(Y, p) + dx1 = (x1_max - x1_min) / 1_000 + dx2 = (x2_max - x2_min) / 1_000 + dx = np.min([dx1, dx2]) # Use the smaller delta x -- must use the same dx for both distributions + x1 = np.arange(x1_min, x1_max, dx) + x2 = np.arange(x2_min, x2_max, dx) + + # Compute the PDF of each distribution + f_X = X.pdf(x1) + f_Y = Y.pdf(x2) + + # The PDF of the sum of two independent random variables is the convolution of the PDF of the two distributions + f_Z = np.convolve(f_X, f_Y, mode="full") * dx + + # Determine the x axis for the output convolution + x = np.arange(f_Z.size) * dx + x1[0] + x2[0] + + # Adjust the histograms bins to be on either side of each point. So there is one extra point added. + x = np.append(x, x[-1] + dx) + x -= dx / 2 + + return scipy.stats.rv_histogram((f_Z, x)) + + +@export +def multiply_distributions( + X: scipy.stats.rv_continuous | scipy.stats.rv_histogram, + Y: scipy.stats.rv_continuous | scipy.stats.rv_histogram, + x: npt.ArrayLike | None = None, + p: float = 1e-10, +) -> scipy.stats.rv_histogram: + r""" + Numerically calculates the distribution of the product of two independent random variables $X$ and $Y$. + + Arguments: + X: The distribution of the first random variable $X$. + Y: The distribution of the second random variable $Y$. + x: The x values at which to evaluate the PDF of the product. If None, the x values are determined based on `p`. + p: The probability of exceeding the x axis, on either side, for each distribution. This is used to determine + the bounds on the x axis for the numerical convolution. Smaller values of $p$ will result in more accurate + analysis, but will require more computation. + + Returns: + The distribution of the product $Z = X \cdot Y$. + + Notes: + The PDF of the product of two independent random variables is calculated as follows. + + $$ + f_{X \cdot Y}(t) = + \int_{0}^{\infty} f_X(k) f_Y(t/k) \frac{1}{k} dk - + \int_{-\infty}^{0} f_X(k) f_Y(t/k) \frac{1}{k} dk + $$ + + Examples: + Compute the distribution of the product of two normal distributions. + + .. ipython:: python + + X = scipy.stats.norm(loc=-1, scale=0.5) + Y = scipy.stats.norm(loc=2, scale=1.5) + x = np.linspace(-15, 10, 1_001) + + @savefig sdr_multiply_distributions_1.png + plt.figure(); \ + plt.plot(x, X.pdf(x), label="X"); \ + plt.plot(x, Y.pdf(x), label="Y"); \ + plt.plot(x, sdr.multiply_distributions(X, Y).pdf(x), label="X * Y"); \ + plt.hist(X.rvs(100_000) * Y.rvs(100_000), bins=101, density=True, histtype="step", label="X * Y empirical"); \ + plt.legend(); \ + plt.xlabel("Random variable"); \ + plt.ylabel("Probability density"); \ + plt.title("Product of two Normal distributions"); + + Group: + probability + """ + verify_scalar(p, float=True, exclusive_min=0, inclusive_max=0.1) + + if x is None: + # Determine the x axis of each distribution such that the probability of exceeding the x axis, on either side, + # is p. + x1_min, x1_max = _x_range(X, np.sqrt(p)) + x2_min, x2_max = _x_range(Y, np.sqrt(p)) + bounds = np.array([x1_min * x2_min, x1_min * x2_max, x1_max * x2_min, x1_max * x2_max]) + x_min = np.min(bounds) + x_max = np.max(bounds) + x = np.linspace(x_min, x_max, 1_001) + else: + x = verify_arraylike(x, float=True, atleast_1d=True, ndim=1) + x = np.sort(x) + dx = np.mean(np.diff(x)) + + def integrand(k: float, xi: float) -> float: + return X.pdf(k) * Y.pdf(xi / k) * 1 / k + + f_Z = np.zeros_like(x) + for i, xi in enumerate(x): + f_Z[i] = scipy.integrate.quad(integrand, 0, np.inf, args=(xi,))[0] + f_Z[i] -= scipy.integrate.quad(integrand, -np.inf, 0, args=(xi,))[0] + + # Adjust the histograms bins to be on either side of each point. So there is one extra point added. + x = np.append(x, x[-1] + dx) + x -= dx / 2 + + return scipy.stats.rv_histogram((f_Z, x)) + + +def _x_range(X: scipy.stats.rv_continuous, p: float) -> tuple[float, float]: + r""" + Determines the range of x values for a given distribution such that the probability of exceeding the x axis, on + either side, is p. + """ + # Need to have these loops because for very small p, sometimes SciPy will return NaN instead of a valid value. + # The while loops will increase the p value until a valid value is returned. + + pp = p + while True: + x_min = X.ppf(pp) + if not np.isnan(x_min): + break + pp *= 10 + + pp = p + while True: + x_max = X.isf(pp) + if not np.isnan(x_max): + break + pp *= 10 + + return x_min, x_max diff --git a/src/sdr/_sequence/_correlation.py b/src/sdr/_sequence/_correlation.py index bef9c3d0b..fabff417b 100644 --- a/src/sdr/_sequence/_correlation.py +++ b/src/sdr/_sequence/_correlation.py @@ -368,11 +368,11 @@ def gold_code( length: The length $n = 2^m - 1$ of the Gold code/sequence. index: The index $i$ in $[-2, n)$ of the Gold code. poly1: The primitive polynomial of degree $m$ over $\mathrm{GF}(2)$ for the first $m$-sequence. If `None`, - a preferred pair is found using :func:`sdr.preferred_pairs()`. + a preferred pair is found using :func:`sdr.preferred_pairs`. poly2: The primitive polynomial of degree $m$ over $\mathrm{GF}(2)$ for the second $m$-sequence. If `None`, - a preferred pair is found using :func:`sdr.preferred_pairs()`. + a preferred pair is found using :func:`sdr.preferred_pairs`. verify: Indicates whether to verify that the provided polynomials are a preferred pair using - :func:`sdr.is_preferred_pair()`. + :func:`sdr.is_preferred_pair`. output: The output format of the Gold code/sequence. - `"binary"` (default): The Gold code with binary values of 0 and 1. diff --git a/src/sdr/_simulation/_channel_model.py b/src/sdr/_simulation/_channel_model.py index 3a3957a19..e4baea509 100644 --- a/src/sdr/_simulation/_channel_model.py +++ b/src/sdr/_simulation/_channel_model.py @@ -23,7 +23,7 @@ def bsc( Arguments: x: The input sequence $x$ with $x_i \in \{0, 1\}$. p: The probability $p$ of a bit flip. - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. Returns: The output sequence $y$ with $y_i \in \{0, 1\}$. @@ -63,7 +63,7 @@ def bec( Arguments: x: The input sequence $x$ with $x_i \in \{0, 1\}$. p: The probability $p$ of a bit erasure. - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. Returns: The output sequence $y$ with $y_i \in \{0, 1, e\}$. Erasures $e$ are represented by -1. @@ -109,7 +109,7 @@ def dmc( $\mathcal{X} = \{0, 1, \ldots, m-1\}$. Y: The output alphabet $\mathcal{Y}$ of size $n$. If `None`, it is assumed that $\mathcal{Y} = \{0, 1, \ldots, n-1\}$. - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. Returns: The output sequence $y$ with $y_i \in \mathcal{Y}$. @@ -176,7 +176,7 @@ def __init__(self, seed: int | None = None): Creates a new channel. Arguments: - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. """ self._rng: np.random.Generator # Will be set in self.reset() @@ -187,7 +187,7 @@ def reset(self, seed: int | None = None): Resets the channel with a new seed. Arguments: - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. """ self._rng = np.random.default_rng(seed) @@ -287,7 +287,7 @@ def __init__(self, p: float, seed: int | None = None): Arguments: p: The transition probability $p$ of the BSC channel. - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. """ self._p = verify_scalar(p, float=True, inclusive_min=0, inclusive_max=1) @@ -418,7 +418,7 @@ def __init__(self, p: float, seed: int | None = None): Arguments: p: The erasure probability $p$ of the BEC channel. - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. """ self._p = verify_scalar(p, float=True, inclusive_min=0, inclusive_max=1) @@ -548,7 +548,7 @@ def __init__( P: The $m \times n$ transition probability matrix $P$, where $P = \Pr(Y = y_j | X = x_i)$. X: The input alphabet $\mathcal{X}$ of size $m$. If `None`, it is assumed that $\mathcal{X} = \{0, 1, \ldots, m-1\}$. Y: The output alphabet $\mathcal{Y}$ of size $n$. If `None`, it is assumed that $\mathcal{Y} = \{0, 1, \ldots, n-1\}$. - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. """ P = verify_arraylike(P, float=True, ndim=2, inclusive_min=0, inclusive_max=1) verify_condition(np.allclose(P.sum(axis=1), 1)) diff --git a/src/sdr/_simulation/_impairment.py b/src/sdr/_simulation/_impairment.py index afc37ba04..90279871d 100644 --- a/src/sdr/_simulation/_impairment.py +++ b/src/sdr/_simulation/_impairment.py @@ -30,7 +30,7 @@ def awgn( desired noise variance can be computed and passed in `noise`. If `snr` is `None`, `noise` must be specified. noise: The noise power (variance) in linear units. If `noise` is `None`, `snr` must be specified. - seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng()`. + seed: The seed for the random number generator. This is passed to :func:`numpy.random.default_rng`. Returns: The noisy signal $x[n] + w[n]$. diff --git a/src/sdr/plot/_detection.py b/src/sdr/plot/_detection.py index 03beeba61..01ccfd303 100644 --- a/src/sdr/plot/_detection.py +++ b/src/sdr/plot/_detection.py @@ -30,7 +30,7 @@ def p_d( p_d: The probability of detection $P_d$. ax: The axis to plot on. If `None`, the current axis is used. x_label: The x-axis label to use. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Group: plot-detection @@ -139,7 +139,7 @@ def detector_pdfs( p_h0: The probability of the $\mathcal{H}_0$ tails to plot. The smaller the value, the longer the x-axis. p_h1: The probability of the $\mathcal{H}_1$ tails to plot. The smaller the value, the longer the x-axis. ax: The axis to plot on. If `None`, the current axis is used. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. See Also: sdr.h0, sdr.h1, sdr.threshold diff --git a/src/sdr/plot/_filter.py b/src/sdr/plot/_filter.py index be1988d4b..cf347cfe8 100644 --- a/src/sdr/plot/_filter.py +++ b/src/sdr/plot/_filter.py @@ -225,7 +225,7 @@ def zeros_poles( feedback coefficients $a_j$. ax: The axis to plot on. If `None`, the current axis is used. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Examples: See the :ref:`fir-filters` example. @@ -312,7 +312,7 @@ def magnitude_response( for real-valued filters and `"two-sided"` for complex-valued filters. y_axis: The y-axis scaling. Options are to display a linear or logarithmic magnitude response. decades: The number of decades to plot when `x_axis="log"`. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Examples: See the :ref:`fir-filters` example. @@ -440,7 +440,7 @@ def phase_response( one-sided spectrum with a logarithmic frequency axis. The default is `"auto"` which selects `"one-sided"` for real-valued filters and `"two-sided"` for complex-valued filters. decades: The number of decades to plot when `x_axis="log"`. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Examples: See the :ref:`fir-filters` example. @@ -563,7 +563,7 @@ def phase_delay( one-sided spectrum with a logarithmic frequency axis. The default is `"auto"` which selects `"one-sided"` for real-valued filters and `"two-sided"` for complex-valued filters. decades: The number of decades to plot when `x_axis="log"`. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Examples: See the :ref:`fir-filters` example. @@ -687,7 +687,7 @@ def group_delay( one-sided spectrum with a logarithmic frequency axis. The default is `"auto"` which selects `"one-sided"` for real-valued filters and `"two-sided"` for complex-valued filters. decades: The number of decades to plot when `x_axis="log"`. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Examples: See the :ref:`fir-filters` example. diff --git a/src/sdr/plot/_freq_domain.py b/src/sdr/plot/_freq_domain.py index 68e42f50b..7d3faa93f 100644 --- a/src/sdr/plot/_freq_domain.py +++ b/src/sdr/plot/_freq_domain.py @@ -20,7 +20,7 @@ def dft( x: npt.ArrayLike, sample_rate: float | None = None, - window: str | None = None, + window: str | float | tuple | None = None, size: int | None = None, oversample: int | None = None, fast: bool = False, @@ -39,7 +39,8 @@ def dft( x: The time-domain signal $x[n]$. sample_rate: The sample rate $f_s$ of the signal in samples/s. If `None`, the x-axis will be labeled as "Normalized frequency". - window: The windowing function to use. This can be a string or a vector of length `length`. + window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. + If `None`, no window is applied. size: The number of points to use for the DFT. If `None`, the length of the signal is used. oversample: The factor to oversample the DFT. If `None`, the DFT is not oversampled. This is only considered if `size` is `None`. @@ -128,8 +129,7 @@ def dft( ax = plt.gca() if window is not None: - w = scipy.signal.windows.get_window(window, x.size) - x = x * w + x *= scipy.signal.windows.get_window(window, x.size) if size is None: if oversample is None: @@ -176,7 +176,7 @@ def dft( def dtft( x: npt.ArrayLike, sample_rate: float | None = None, - window: str | None = None, + window: str | float | tuple | None = None, size: int = int(2**20), # ~1 million points centered: bool = True, ax: plt.Axes | None = None, @@ -191,7 +191,8 @@ def dtft( x: The time-domain signal $x[n]$. sample_rate: The sample rate $f_s$ of the signal in samples/s. If `None`, the x-axis will be labeled as "Normalized frequency". - window: The windowing function to use. This can be a string or a vector of length `length`. + window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. + If `None`, no window is applied. size: The number of points to use for the DTFT. The actual size used will be the nearest power of 2. centered: Indicates whether to center the DTFT about 0. ax: The axis to plot on. If `None`, the current axis is used. diff --git a/src/sdr/plot/_modulation.py b/src/sdr/plot/_modulation.py index 144b15b14..b401c6c90 100644 --- a/src/sdr/plot/_modulation.py +++ b/src/sdr/plot/_modulation.py @@ -39,13 +39,13 @@ def constellation( ax: The axis to plot on. If `None`, the current axis is used. kwargs: Additional keyword arguments to pass to Matplotlib functions. - If `persistence=False`, the following keyword arguments are passed to :func:`matplotlib.pyplot.scatter()`. + If `persistence=False`, the following keyword arguments are passed to :func:`matplotlib.pyplot.scatter`. The defaults may be overwritten. - `"marker"`: `"."` - `"linestyle"`: `"none"` - If `persistence=True`, the following keyword arguments are passed to :func:`numpy.histogram2d()` and + If `persistence=True`, the following keyword arguments are passed to :func:`numpy.histogram2d` and :func:`matplotlib.pyplot.pcolormesh`. The defaults may be overwritten. - `"range"`: +/- 10% of the maximum value @@ -153,7 +153,7 @@ def symbol_map( limits: The axis limits, which apply to both the x- and y-axis. If `None`, the axis limits are set to 50% larger than the maximum value. ax: The axis to plot on. If `None`, the current axis is used. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. The following keyword arguments are set by default. The defaults may be overwritten. - `"marker"`: `"x"` @@ -271,7 +271,7 @@ def eye( colorbar: Indicates whether to add a colorbar to the plot. This is only added if `color="index"` or `persistence=True`. ax: The axis to plot on. If `None`, the current axis is used. - kwargs: Additional keyword arguments to pass to :func:`sdr.plot.raster()`. + kwargs: Additional keyword arguments to pass to :func:`sdr.plot.raster`. Example: Modulate 1,000 QPSK symbols using a square root raised cosine (SRRC) pulse shaping filter. The SRRC pulse shape @@ -385,7 +385,7 @@ def phase_tree( color: Indicates how to color the rasters. If `"index"`, the rasters are colored based on their index. If a valid Matplotlib color, the rasters are all colored with that color. ax: The axis to plot on. If `None`, the current axis is used. - kwargs: Additional keyword arguments to pass to :func:`sdr.plot.raster()`. + kwargs: Additional keyword arguments to pass to :func:`sdr.plot.raster`. Example: Modulate 100 MSK symbols. @@ -465,7 +465,7 @@ def ber( ebn0: The bit energy $E_b$ to noise PSD $N_0$ ratio (dB). ber: The bit error rate $P_{be}$. ax: The axis to plot on. If `None`, the current axis is used. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.semilogy()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.semilogy`. Examples: Plot theoretical BER curves for BPSK, QPSK, 8-PSK, and 16-PSK in an AWGN channel. @@ -523,7 +523,7 @@ def ser( esn0: The symbol energy $E_s$ to noise PSD $N_0$ ratio (dB). ser: The symbol error rate $P_{se}$. ax: The axis to plot on. If `None`, the current axis is used. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.semilogy()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.semilogy`. Examples: Plot theoretical SER curves for BPSK, QPSK, 8-PSK, and 16-PSK in an AWGN channel. diff --git a/src/sdr/plot/_rc_params.py b/src/sdr/plot/_rc_params.py index 6d9530227..3446e0629 100644 --- a/src/sdr/plot/_rc_params.py +++ b/src/sdr/plot/_rc_params.py @@ -31,7 +31,7 @@ def use_style(): """ Applies :obj:`sdr`'s default :obj:`matplotlib` rcParams. - These style settings may be reverted with :func:`matplotlib.pyplot.rcdefaults()`. + These style settings may be reverted with :func:`matplotlib.pyplot.rcdefaults`. Examples: The following rcParams are applied. diff --git a/src/sdr/plot/_spectral_estimation.py b/src/sdr/plot/_spectral_estimation.py index 3512a5e68..9e7edac1f 100644 --- a/src/sdr/plot/_spectral_estimation.py +++ b/src/sdr/plot/_spectral_estimation.py @@ -21,7 +21,7 @@ def periodogram( x: npt.ArrayLike, sample_rate: float | None = None, - window: str | npt.ArrayLike = "hann", + window: str | float | tuple | None = "hann", length: int | None = None, overlap: int | None = None, fft: int | None = None, @@ -43,7 +43,8 @@ def periodogram( x: The time-domain signal $x[n]$. sample_rate: The sample rate $f_s$ of the signal in samples/s. If `None`, the x-axis will be labeled as "Normalized frequency". - window: The windowing function to use. This can be a string or a vector of length `length`. + window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. + If `None`, no window is applied. length: The length of each segment in samples. If `None`, the length is set to 256. overlap: The number of samples to overlap between segments. If `None`, the overlap is set to `length // 2`. fft: The number of points to use in the FFT. If `None`, the FFT length is set to `length`. @@ -54,7 +55,7 @@ def periodogram( one-sided spectrum with a logarithmic frequency axis. The default is `"auto"` which selects `"one-sided"` for real-valued signals and `"two-sided"` for complex-valued signals. y_axis: The y-axis scaling. Options are to display a linear or logarithmic power spectral density. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Group: plot-spectral-estimation @@ -128,7 +129,7 @@ def periodogram( def spectrogram( x: npt.ArrayLike, sample_rate: float | None = None, - window: str | npt.ArrayLike = "hann", + window: str | float | tuple | None = "hann", length: int | None = None, overlap: int | None = None, fft: int | None = None, @@ -143,13 +144,14 @@ def spectrogram( Plots the spectrogram of a time-domain signal $x[n]$ using Welch's method. Note: - This function uses :func:`scipy.signal.spectrogram()` to estimate the spectrogram of the time-domain signal. + This function uses :func:`scipy.signal.spectrogram` to estimate the spectrogram of the time-domain signal. Arguments: x: The time-domain signal $x[n]$. sample_rate: The sample rate $f_s$ of the signal in samples/s. If `None`, the x-axis will be label as "Samples" and the y-axis as "Normalized frequency". - window: The windowing function to use. This can be a string or a vector of length `length`. + window: The SciPy window definition. See :func:`scipy.signal.windows.get_window` for details. + If `None`, no window is applied. length: The length of each segment in samples. If `None`, the length is set to 256. overlap: The number of samples to overlap between segments. If `None`, the overlap is set to `length // 2`. fft: The number of points to use in the FFT. If `None`, the FFT length is set to `length`. @@ -158,7 +160,7 @@ def spectrogram( y_axis: The y-axis scaling. Options are to display a one-sided spectrum or a two-sided spectrum. The default is `"auto"` which selects `"one-sided"` for real-valued signals and `"two-sided"` for complex-valued signals. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.pcolormesh()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.pcolormesh`. The following keyword arguments are set by default. The defaults may be overwritten. - `"vmin"`: 10th percentile diff --git a/src/sdr/plot/_time_domain.py b/src/sdr/plot/_time_domain.py index ad1c04dbb..47a0ebcc6 100644 --- a/src/sdr/plot/_time_domain.py +++ b/src/sdr/plot/_time_domain.py @@ -83,7 +83,7 @@ def time_domain( # noqa: D417 real and imaginary parts will have different colors based on the current Matplotlib color cycle. If `"line"`, the real part will have a solid line and the imaginary part will have a dashed line, and both lines will share the same color. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Examples: Plot a square-root raised cosine (SRRC) pulse shape centered about 0. @@ -332,7 +332,7 @@ def correlation( y: The second time-domain signal $y[n]$. sample_rate: The sample rate $f_s$ of the signal in samples/s. If `None`, the x-axis will be labeled as "Lag (samples)". - mode: The :func:`numpy.correlate()` correlation mode. If `"circular"`, a circular correlation is computed + mode: The :func:`numpy.correlate` correlation mode. If `"circular"`, a circular correlation is computed using FFTs. ax: The axis to plot on. If `None`, the current axis is used. y_axis: Indicates how to plot the y-axis. If `"complex"`, the real and imaginary parts are plotted separately. @@ -340,7 +340,7 @@ def correlation( real and imaginary parts will have different colors based on the current Matplotlib color cycle. If `"line"`, the real part will have a solid line and the imaginary part will have a dashed line, and both lines will share the same color. - kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot()`. + kwargs: Additional keyword arguments to pass to :func:`matplotlib.pyplot.plot`. Examples: Plot the auto-correlation of a length-63 $m$-sequence. Notice that the linear correlation produces sidelobes diff --git a/tests/detection/test_h0_h1_theory.py b/tests/detection/test_h0_h1.py similarity index 100% rename from tests/detection/test_h0_h1_theory.py rename to tests/detection/test_h0_h1.py diff --git a/tests/dsp/filter/__init__.py b/tests/dsp/filter/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/dsp/filter/test_noise_bandwidth.py b/tests/dsp/filter/test_noise_bandwidth.py new file mode 100644 index 000000000..7d0c65e95 --- /dev/null +++ b/tests/dsp/filter/test_noise_bandwidth.py @@ -0,0 +1,89 @@ +""" +References: + - https://www.gaussianwaves.com/2020/09/equivalent-noise-bandwidth-enbw-of-window-functions/ +""" + +import pytest +import scipy.signal + +import sdr + + +def test_boxcar(): + n = 128 + h = scipy.signal.windows.get_window("boxcar", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(1.000, rel=1e-3) + + +def test_barthann(): + n = 128 + h = scipy.signal.windows.get_window("barthann", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(1.456, rel=1e-3) + + +def test_bartlett(): + n = 128 + h = scipy.signal.windows.get_window("bartlett", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(1.333, rel=1e-3) + + +def test_blackman(): + n = 128 + h = scipy.signal.windows.get_window("blackman", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(1.727, rel=1e-3) + + +def test_blackmanharris(): + n = 128 + h = scipy.signal.windows.get_window("blackmanharris", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(2.004, rel=1e-3) + + +def test_bohman(): + n = 128 + h = scipy.signal.windows.get_window("bohman", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(1.786, rel=1e-3) + + +def test_flattop(): + n = 128 + h = scipy.signal.windows.get_window("flattop", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(3.770, rel=1e-3) + + +def test_hamming(): + n = 128 + h = scipy.signal.windows.get_window("hamming", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(1.363, rel=1e-3) + + +def test_hann(): + n = 128 + h = scipy.signal.windows.get_window("hann", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(1.500, rel=1e-3) + + +def test_nuttall(): + n = 128 + h = scipy.signal.windows.get_window("nuttall", n) + fir = sdr.FIR(h) + B_n = fir.noise_bandwidth(n) + assert B_n == pytest.approx(1.976, rel=1e-3) diff --git a/tests/dsp/fir/test_design_bandpass_fir.py b/tests/dsp/fir/test_bandpass_fir.py similarity index 93% rename from tests/dsp/fir/test_design_bandpass_fir.py rename to tests/dsp/fir/test_bandpass_fir.py index a3b1e3177..514714750 100644 --- a/tests/dsp/fir/test_design_bandpass_fir.py +++ b/tests/dsp/fir/test_bandpass_fir.py @@ -11,7 +11,7 @@ def test_custom(): >> h = designBandpassFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="custom", CustomWindow=ones(1, 31)); >> transpose(h) """ - h = sdr.design_bandpass_fir(30, 0.4, 0.25, window=None) + h = sdr.bandpass_fir(30, 0.4, 0.25, window=None) h_truth = np.array( [ -0.017963811098946, @@ -56,7 +56,7 @@ def test_hamming(): >> h = designBandpassFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="hamming"); >> transpose(h) """ - h = sdr.design_bandpass_fir(30, 0.4, 0.25, window="hamming") + h = sdr.bandpass_fir(30, 0.4, 0.25, window="hamming") h_truth = np.array( [ -0.001295920763505, @@ -101,7 +101,7 @@ def test_hann(): >> h = designBandpassFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="hann"); >> transpose(h) """ - h = sdr.design_bandpass_fir(30, 0.4, 0.25, window="hann") + h = sdr.bandpass_fir(30, 0.4, 0.25, window="hann") h_truth = np.array( [ 0, @@ -146,7 +146,7 @@ def test_blackman(): >> h = designBandpassFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="blackman"); >> transpose(h) """ - h = sdr.design_bandpass_fir(30, 0.4, 0.25, window="blackman") + h = sdr.bandpass_fir(30, 0.4, 0.25, window="blackman") h_truth = np.array( [ 0.000000000000000, @@ -191,7 +191,7 @@ def test_blackman_harris(): >> h = designBandpassFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="blackman-harris"); >> transpose(h) """ - h = sdr.design_bandpass_fir(30, 0.4, 0.25, window="blackman-harris") + h = sdr.bandpass_fir(30, 0.4, 0.25, window="blackmanharris") h_truth = np.array( [ -0.000001060796132, @@ -236,7 +236,7 @@ def test_chebyshev(): >> h = designBandpassFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="chebyshev"); >> transpose(h) """ - h = sdr.design_bandpass_fir(30, 0.4, 0.25, window="chebyshev") + h = sdr.bandpass_fir(30, 0.4, 0.25, window=("chebwin", 60)) h_truth = np.array( [ -0.000309113441909, @@ -281,8 +281,8 @@ def test_kaiser(): >> h = designBandpassFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="kaiser"); >> transpose(h) """ - # MATLAB uses beta=0.5 for Kaiser window. Attenuation of 21.542 dB was reverse engineered. - h = sdr.design_bandpass_fir(30, 0.4, 0.25, window="kaiser", atten=21.542) + # MATLAB uses beta=0.5 for Kaiser window + h = sdr.bandpass_fir(30, 0.4, 0.25, window=("kaiser", 0.5)) h_truth = np.array( [ -0.016765450319470, diff --git a/tests/dsp/fir/test_design_bandstop_fir.py b/tests/dsp/fir/test_bandstop_fir.py similarity index 88% rename from tests/dsp/fir/test_design_bandstop_fir.py rename to tests/dsp/fir/test_bandstop_fir.py index b93676efe..e3b689997 100644 --- a/tests/dsp/fir/test_design_bandstop_fir.py +++ b/tests/dsp/fir/test_bandstop_fir.py @@ -11,7 +11,7 @@ def test_custom(): >> h = designBandstopFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="custom", CustomWindow=ones(1,31)); >> transpose(h) """ - h = sdr.design_bandstop_fir(30, 0.4, 0.25, window=None) + h = sdr.bandstop_fir(30, 0.4, 0.25, window=None) h_truth = np.array( [ 0.017963811098946, @@ -47,7 +47,7 @@ def test_custom(): 0.017963811098946, ] ) - verify_impulse_response(h, h_truth, atol=1e-1) + verify_impulse_response(h, h_truth, atol=1e-1) # TODO: These aren't exactly identical def test_hamming(): @@ -56,7 +56,7 @@ def test_hamming(): >> h = designBandstopFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="hamming"); >> transpose(h) """ - h = sdr.design_bandstop_fir(30, 0.4, 0.25, window="hamming") + h = sdr.bandstop_fir(30, 0.4, 0.25, window="hamming") h_truth = np.array( [ 0.001295920763505, @@ -92,7 +92,7 @@ def test_hamming(): 0.001295920763505, ] ) - verify_impulse_response(h, h_truth, atol=1e-3) + verify_impulse_response(h, h_truth, atol=1e-3) # TODO: These aren't exactly identical def test_hann(): @@ -101,7 +101,7 @@ def test_hann(): >> h = designBandstopFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="hann"); >> transpose(h) """ - h = sdr.design_bandstop_fir(30, 0.4, 0.25, window="hann") + h = sdr.bandstop_fir(30, 0.4, 0.25, window="hann") h_truth = np.array( [ 0, @@ -137,7 +137,7 @@ def test_hann(): 0, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_blackman(): @@ -146,7 +146,7 @@ def test_blackman(): >> h = designBandstopFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="blackman"); >> transpose(h) """ - h = sdr.design_bandstop_fir(30, 0.4, 0.25, window="blackman") + h = sdr.bandstop_fir(30, 0.4, 0.25, window="blackman") h_truth = np.array( [ -0.000000000000000, @@ -182,7 +182,7 @@ def test_blackman(): -0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_blackman_harris(): @@ -191,7 +191,7 @@ def test_blackman_harris(): >> h = designBandstopFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="blackman-harris"); >> transpose(h) """ - h = sdr.design_bandstop_fir(30, 0.4, 0.25, window="blackman-harris") + h = sdr.bandstop_fir(30, 0.4, 0.25, window="blackmanharris") h_truth = np.array( [ 0.000001060796132, @@ -227,7 +227,7 @@ def test_blackman_harris(): 0.000001060796132, ] ) - verify_impulse_response(h, h_truth, atol=1e-1) + verify_impulse_response(h, h_truth, atol=1e-1) # TODO: These aren't exactly identical def test_chebyshev(): @@ -236,7 +236,7 @@ def test_chebyshev(): >> h = designBandstopFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="chebyshev"); >> transpose(h) """ - h = sdr.design_bandstop_fir(30, 0.4, 0.25, window="chebyshev") + h = sdr.bandstop_fir(30, 0.4, 0.25, window=("chebwin", 60)) h_truth = np.array( [ 0.000309113441909, @@ -272,7 +272,7 @@ def test_chebyshev(): 0.000309113441909, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_kaiser(): @@ -281,7 +281,7 @@ def test_kaiser(): >> h = designBandstopFIR(FilterOrder=30, CenterFrequency=0.4, Bandwidth=0.25, Window="kaiser"); >> transpose(h) """ - h = sdr.design_bandstop_fir(30, 0.4, 0.25, window="kaiser", atten=20) + h = sdr.bandstop_fir(30, 0.4, 0.25, window=("kaiser", 0.5)) h_truth = np.array( [ 0.016765450319470, @@ -317,4 +317,4 @@ def test_kaiser(): 0.016765450319470, ] ) - verify_impulse_response(h, h_truth, atol=1e-1) + verify_impulse_response(h, h_truth, atol=1e-1) # TODO: These aren't exactly identical diff --git a/tests/dsp/fir/test_fir.py b/tests/dsp/fir/test_fir.py index a215d3226..351a02b6a 100644 --- a/tests/dsp/fir/test_fir.py +++ b/tests/dsp/fir/test_fir.py @@ -71,3 +71,150 @@ def test_impulse_response(): h_truth = np.concatenate((h_truth, [0] * 10)) assert h.shape == h_truth.shape assert np.allclose(h, h_truth) + + +def test_group_delay(): + """ + MATLAB: + >> b = fircls1(54,0.3,0.02,0.008); + >> [phi, w] = grpdelay(b, 1, 30, 'whole'); + """ + b = np.array( + [ + 0.000615962351741, + -0.006511121375111, + -0.006183540431315, + -0.003722363006434, + 0.001572503340765, + 0.006331717630481, + 0.006057839196610, + -0.000001882825214, + -0.007714271124356, + -0.010142338952650, + -0.003693339523364, + 0.007892419862013, + 0.015062898684317, + 0.009960454375576, + -0.005808037223313, + -0.020311637745345, + -0.019602542557400, + -0.000117171744191, + 0.025304978475102, + 0.034483175486423, + 0.013232973417498, + -0.029434340049555, + -0.061377964476540, + -0.045857204952108, + 0.032161361913682, + 0.150419734906289, + 0.257272368662233, + 0.300218735368335, + 0.257272368662233, + 0.150419734906289, + 0.032161361913682, + -0.045857204952108, + -0.061377964476540, + -0.029434340049555, + 0.013232973417498, + 0.034483175486423, + 0.025304978475102, + -0.000117171744191, + -0.019602542557400, + -0.020311637745345, + -0.005808037223313, + 0.009960454375576, + 0.015062898684317, + 0.007892419862013, + -0.003693339523364, + -0.010142338952650, + -0.007714271124356, + -0.000001882825214, + 0.006057839196610, + 0.006331717630481, + 0.001572503340765, + -0.003722363006434, + -0.006183540431315, + -0.006511121375111, + 0.000615962351741, + ] + ) + omega_truth = np.array( + [ + 0, + 0.209439510239320, + 0.418879020478639, + 0.628318530717959, + 0.837758040957278, + 1.047197551196598, + 1.256637061435917, + 1.466076571675237, + 1.675516081914556, + 1.884955592153876, + 2.094395102393195, + 2.303834612632515, + 2.513274122871834, + 2.722713633111154, + 2.932153143350474, + 3.141592653589793, + 3.351032163829113, + 3.560471674068432, + 3.769911184307752, + 3.979350694547071, + 4.188790204786391, + 4.398229715025710, + 4.607669225265029, + 4.817108735504349, + 5.026548245743669, + 5.235987755982989, + 5.445427266222309, + 5.654866776461628, + 5.864306286700947, + 6.073745796940267, + ] + ) + phi_truth = np.array( + [ + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + 27, + ] + ) + + fir = sdr.FIR(b) + f, phi = fir.group_delay(N=omega_truth.size) + omega = 2 * np.pi * f + + # Convert to 0 to 2*pi + omega = np.fft.fftshift(omega) + omega = np.mod(omega, 2 * np.pi) + phi = np.fft.fftshift(phi) + + assert np.allclose(omega, omega_truth) + assert np.allclose(phi, phi_truth) diff --git a/tests/dsp/fir/test_design_highpass_fir.py b/tests/dsp/fir/test_highpass_fir.py similarity index 88% rename from tests/dsp/fir/test_design_highpass_fir.py rename to tests/dsp/fir/test_highpass_fir.py index f8524dbc2..f4626b7cf 100644 --- a/tests/dsp/fir/test_design_highpass_fir.py +++ b/tests/dsp/fir/test_highpass_fir.py @@ -1,4 +1,5 @@ import numpy as np +import scipy.signal import sdr @@ -11,7 +12,7 @@ def test_custom(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.6, Window="custom", CustomWindow=ones(1,31)); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.6, window=None) + h = sdr.highpass_fir(30, 0.6, window=None) h_truth = np.array( [ -0.000000000000000, @@ -47,7 +48,7 @@ def test_custom(): -0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_hamming(): @@ -56,7 +57,7 @@ def test_hamming(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.6, Window="hamming"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.6, window="hamming") + h = sdr.highpass_fir(30, 0.6, window="hamming") h_truth = np.array( [ -0.000000000000000, @@ -92,7 +93,7 @@ def test_hamming(): -0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_hann(): @@ -101,7 +102,7 @@ def test_hann(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.6, Window="hann"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.6, window="hann") + h = sdr.highpass_fir(30, 0.6, window="hann") h_truth = np.array( [ 0, @@ -137,7 +138,7 @@ def test_hann(): 0, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_blackman(): @@ -146,7 +147,7 @@ def test_blackman(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.6, Window="blackman"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.6, window="blackman") + h = sdr.highpass_fir(30, 0.6, window="blackman") h_truth = np.array( [ 0.000000000000000, @@ -182,7 +183,7 @@ def test_blackman(): 0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_blackman_harris(): @@ -191,7 +192,7 @@ def test_blackman_harris(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.6, Window="blackman-harris"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.6, window="blackman-harris") + h = sdr.highpass_fir(30, 0.6, window="blackmanharris") h_truth = np.array( [ -0.000000000000000, @@ -227,7 +228,7 @@ def test_blackman_harris(): -0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_chebyshev(): @@ -236,7 +237,7 @@ def test_chebyshev(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.6, Window="chebyshev"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.6, window="chebyshev") + h = sdr.highpass_fir(30, 0.6, window=("chebwin", 60)) h_truth = np.array( [ -0.000000000000000, @@ -272,7 +273,7 @@ def test_chebyshev(): -0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical # NOTE: Added extra Kaiser window tests to reverse engineer MATLAB's beta parameter @@ -284,7 +285,7 @@ def test_kaiser_0p2(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.2, Window="kaiser"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.2, window="kaiser", atten=30) + h = sdr.highpass_fir(30, 0.2, window=("kaiser", 0.5)) h_truth = np.array( [ -0.000000000000000, @@ -320,7 +321,7 @@ def test_kaiser_0p2(): -0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-1) + verify_impulse_response(h, h_truth, atol=1e-1) # TODO: These aren't exactly identical def test_kaiser_0p4(): @@ -329,7 +330,7 @@ def test_kaiser_0p4(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.4, Window="kaiser"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.4, window="kaiser", atten=21.542) + h = sdr.highpass_fir(30, 0.4, window=("kaiser", 0.5)) h_truth = np.array( [ 0.000000000000000, @@ -365,7 +366,7 @@ def test_kaiser_0p4(): 0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-1) + verify_impulse_response(h, h_truth, atol=1e-1) # TODO: These aren't exactly identical def test_kaiser_0p6(): @@ -374,7 +375,8 @@ def test_kaiser_0p6(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.6, Window="kaiser"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.6, window="kaiser", atten=60) + beta = scipy.signal.kaiser_beta(60) + h = sdr.highpass_fir(30, 0.6, window=("kaiser", beta)) h_truth = np.array( [ -0.000000000000000, @@ -410,7 +412,7 @@ def test_kaiser_0p6(): -0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical def test_kaiser_0p8(): @@ -419,7 +421,7 @@ def test_kaiser_0p8(): >> h = designHighpassFIR(FilterOrder=30, CutoffFrequency=0.8, Window="kaiser"); >> transpose(h) """ - h = sdr.design_highpass_fir(30, 0.8, window="kaiser", atten=20) + h = sdr.highpass_fir(30, 0.8, window=("kaiser", 0.5)) h_truth = np.array( [ 0.000000000000000, @@ -455,4 +457,4 @@ def test_kaiser_0p8(): 0.000000000000000, ] ) - verify_impulse_response(h, h_truth, atol=1e-2) + verify_impulse_response(h, h_truth, atol=1e-2) # TODO: These aren't exactly identical diff --git a/tests/dsp/fir/test_design_lowpass_fir.py b/tests/dsp/fir/test_lowpass_fir.py similarity index 93% rename from tests/dsp/fir/test_design_lowpass_fir.py rename to tests/dsp/fir/test_lowpass_fir.py index 99aecaf2a..f539ff1c0 100644 --- a/tests/dsp/fir/test_design_lowpass_fir.py +++ b/tests/dsp/fir/test_lowpass_fir.py @@ -11,7 +11,7 @@ def test_custom(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.2, Window="custom", CustomWindow=ones(1, 31)); >> transpose(h) """ - h = sdr.design_lowpass_fir(30, 0.2, window=None) + h = sdr.lowpass_fir(30, 0.2, window=None) h_truth = np.array( [ 0.000000000000000, @@ -56,7 +56,7 @@ def test_hamming(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.2, Window="hamming"); >> transpose(h) """ - h = sdr.design_lowpass_fir(30, 0.2, window="hamming") + h = sdr.lowpass_fir(30, 0.2, window="hamming") h_truth = np.array( [ 0.000000000000000, @@ -101,7 +101,7 @@ def test_hann(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.2, Window="hann"); >> transpose(h) """ - h = sdr.design_lowpass_fir(30, 0.2, window="hann") + h = sdr.lowpass_fir(30, 0.2, window="hann") h_truth = np.array( [ 0, @@ -146,7 +146,7 @@ def test_blackman(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.2, Window="blackman"); >> transpose(h) """ - h = sdr.design_lowpass_fir(30, 0.2, window="blackman") + h = sdr.lowpass_fir(30, 0.2, window="blackman") h_truth = np.array( [ -0.000000000000000, @@ -191,7 +191,7 @@ def test_blackman_harris(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.2, Window="blackman-harris"); >> transpose(h) """ - h = sdr.design_lowpass_fir(30, 0.2, window="blackman-harris") + h = sdr.lowpass_fir(30, 0.2, window="blackmanharris") h_truth = np.array( [ 0.000000000000000, @@ -236,7 +236,7 @@ def test_chebyshev(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.2, Window="chebyshev"); >> transpose(h) """ - h = sdr.design_lowpass_fir(30, 0.2, window="chebyshev") + h = sdr.lowpass_fir(30, 0.2, window=("chebwin", 60)) h_truth = np.array( [ 0.000000000000000, @@ -284,8 +284,8 @@ def test_kaiser30_0p1(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.1, Window="kaiser"); >> transpose(h) """ - # MATLAB uses beta=0.5 for Kaiser window. Attenuation of 21.542 dB was reverse engineered. - h = sdr.design_lowpass_fir(30, 0.1, "kaiser", atten=21.542) + # MATLAB uses beta=0.5 for Kaiser window + h = sdr.lowpass_fir(30, 0.1, ("kaiser", 0.5)) h_truth = np.array( [ -0.019837202437853, @@ -330,8 +330,8 @@ def test_kaiser_30_0p2(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.2, Window="kaiser"); >> transpose(h) """ - # MATLAB uses beta=0.5 for Kaiser window. Attenuation of 21.542 dB was reverse engineered. - h = sdr.design_lowpass_fir(30, 0.2, window="kaiser", atten=21.542) + # MATLAB uses beta=0.5 for Kaiser window + h = sdr.lowpass_fir(30, 0.2, window=("kaiser", 0.5)) h_truth = np.array( [ 0.000000000000000, @@ -376,8 +376,8 @@ def test_kaiser30_0p3(): >> h = designLowpassFIR(FilterOrder=30, CutoffFrequency=0.3, Window="kaiser"); >> transpose(h) """ - # MATLAB uses beta=0.5 for Kaiser window. Attenuation of 21.542 dB was reverse engineered. - h = sdr.design_lowpass_fir(30, 0.3, "kaiser", atten=21.542) + # MATLAB uses beta=0.5 for Kaiser window + h = sdr.lowpass_fir(30, 0.3, ("kaiser", 0.5)) h_truth = np.array( [ 0.019631739746596, @@ -422,8 +422,8 @@ def test_kaiser_60_0p2(): >> h = designLowpassFIR(FilterOrder=60, CutoffFrequency=0.2, Window="kaiser"); >> transpose(h) """ - # MATLAB uses beta=0.5 for Kaiser window. Attenuation of 21.542 dB was reverse engineered. - h = sdr.design_lowpass_fir(60, 0.2, "kaiser", atten=21.542) + # MATLAB uses beta=0.5 for Kaiser window + h = sdr.lowpass_fir(60, 0.2, ("kaiser", 0.5)) h_truth = np.array( [ -0.000000000000000, diff --git a/tests/dsp/multirate/test_design_multirate_fir.py b/tests/dsp/multirate/test_multirate_fir.py similarity index 97% rename from tests/dsp/multirate/test_design_multirate_fir.py rename to tests/dsp/multirate/test_multirate_fir.py index 30d93fef3..ee1655808 100644 --- a/tests/dsp/multirate/test_design_multirate_fir.py +++ b/tests/dsp/multirate/test_multirate_fir.py @@ -8,7 +8,7 @@ def test_3_1(): MATLAB: >> transpose(designMultirateFIR(3, 1)) """ - h = sdr.design_multirate_fir(3, 1) + h = sdr.multirate_fir(3, 1) h_truth = np.array( [ 0, @@ -93,7 +93,7 @@ def test_1_3(): MATLAB: >> transpose(designMultirateFIR(1, 3)) """ - h = sdr.design_multirate_fir(1, 3) + h = sdr.multirate_fir(1, 3) h_truth = np.array( [ 0, @@ -178,7 +178,7 @@ def test_3_2(): MATLAB: >> transpose(designMultirateFIR(3, 2)) """ - h = sdr.design_multirate_fir(3, 2) + h = sdr.multirate_fir(3, 2) h_truth = np.array( [ 0, @@ -263,7 +263,7 @@ def test_2_3(): MATLAB: >> transpose(designMultirateFIR(2, 3)) """ - h = sdr.design_multirate_fir(2, 3) + h = sdr.multirate_fir(2, 3) h_truth = np.array( [ 0, @@ -324,7 +324,7 @@ def test_3_2_20(): MATLAB: >> transpose(designMultirateFIR(3, 2, 20)) """ - h = sdr.design_multirate_fir(3, 2, polyphase_order=39) + h = sdr.multirate_fir(3, 2, polyphase_order=39) h_truth = np.array( [ 0, @@ -457,7 +457,7 @@ def test_2_3_20(): MATLAB: >> transpose(designMultirateFIR(2, 3, 20)) """ - h = sdr.design_multirate_fir(2, 3, polyphase_order=39) + h = sdr.multirate_fir(2, 3, polyphase_order=39) h_truth = np.array( [ -0.000036875262276, @@ -551,7 +551,7 @@ def test_3_2_15_120(): MATLAB: >> transpose(designMultirateFIR(3, 2, 15, 120)) """ - h = sdr.design_multirate_fir(3, 2, polyphase_order=29, atten=120) + h = sdr.multirate_fir(3, 2, polyphase_order=29, atten=120) h_truth = np.array( [ 0, @@ -654,7 +654,7 @@ def test_2_3_15_120(): MATLAB: >> transpose(designMultirateFIR(2, 3, 15, 120)) """ - h = sdr.design_multirate_fir(2, 3, polyphase_order=29, atten=120) + h = sdr.multirate_fir(2, 3, polyphase_order=29, atten=120) h_truth = np.array( [ 0, diff --git a/tests/dsp/resampling/test_design_frac_delay_fir.py b/tests/dsp/resampling/test_fractional_delay_fir.py similarity index 94% rename from tests/dsp/resampling/test_design_frac_delay_fir.py rename to tests/dsp/resampling/test_fractional_delay_fir.py index 4ebe62a3a..f3c39f1b9 100644 --- a/tests/dsp/resampling/test_design_frac_delay_fir.py +++ b/tests/dsp/resampling/test_fractional_delay_fir.py @@ -11,7 +11,7 @@ def test_0p25_10(): >> h = designFracDelayFIR(0.25, 10); >> transpose(h) """ - h = sdr.design_frac_delay_fir(10, 0.25) + h = sdr.fractional_delay_fir(10, 0.25) h_truth = np.array( [ 0.003897161584427, @@ -35,7 +35,7 @@ def test_0p25_21(): >> h = designFracDelayFIR(0.25, 21); >> transpose(h) """ - h = sdr.design_frac_delay_fir(21, 0.25) + h = sdr.fractional_delay_fir(21, 0.25) h_truth = np.array( [ -0.000459853432853, @@ -70,7 +70,7 @@ def test_0p25_30(): >> h = designFracDelayFIR(0.25, 30); >> transpose(h) """ - h = sdr.design_frac_delay_fir(30, 0.25) + h = sdr.fractional_delay_fir(30, 0.25) h_truth = np.array( [ 0.000162561838444, @@ -114,7 +114,7 @@ def test_0p5_10(): >> h = designFracDelayFIR(0.5, 10); >> transpose(h) """ - h = sdr.design_frac_delay_fir(10, 0.5) + h = sdr.fractional_delay_fir(10, 0.5) h_truth = np.array( [ 0.005176009486030, @@ -138,7 +138,7 @@ def test_0p5_21(): >> h = designFracDelayFIR(0.5, 21); >> transpose(h) """ - h = sdr.design_frac_delay_fir(21, 0.5) + h = sdr.fractional_delay_fir(21, 0.5) h_truth = np.array( [ -0.000632089078103, @@ -173,7 +173,7 @@ def test_0p5_30(): >> h = designFracDelayFIR(0.5, 30); >> transpose(h) """ - h = sdr.design_frac_delay_fir(30, 0.5) + h = sdr.fractional_delay_fir(30, 0.5) h_truth = np.array( [ 0.000225731894765, @@ -217,7 +217,7 @@ def test_0p75_10(): >> h = designFracDelayFIR(0.75, 10); >> transpose(h) """ - h = sdr.design_frac_delay_fir(10, 0.75) + h = sdr.fractional_delay_fir(10, 0.75) h_truth = np.array( [ 0.003486934049224, @@ -241,7 +241,7 @@ def test_0p75_21(): >> h = designFracDelayFIR(0.75, 21); >> transpose(h) """ - h = sdr.design_frac_delay_fir(21, 0.75) + h = sdr.fractional_delay_fir(21, 0.75) h_truth = np.array( [ -0.000436271205527, @@ -276,7 +276,7 @@ def test_0p75_30(): >> h = designFracDelayFIR(0.75, 30); >> transpose(h) """ - h = sdr.design_frac_delay_fir(30, 0.75) + h = sdr.fractional_delay_fir(30, 0.75) h_truth = np.array( [ 0.000157051267649, @@ -320,7 +320,7 @@ def test_0p123456789_10(): >> h = designFracDelayFIR(0.123456789, 10); >> transpose(h) """ - h = sdr.design_frac_delay_fir(10, 0.123456789) + h = sdr.fractional_delay_fir(10, 0.123456789) h_truth = np.array( [ 0.002163817437857, @@ -344,7 +344,7 @@ def test_0p123456789_21(): >> h = designFracDelayFIR(0.123456789, 21); >> transpose(h) """ - h = sdr.design_frac_delay_fir(21, 0.123456789) + h = sdr.fractional_delay_fir(21, 0.123456789) h_truth = np.array( [ -0.000249932079617, @@ -379,7 +379,7 @@ def test_0p123456789_30(): >> h = designFracDelayFIR(0.123456789, 30); >> transpose(h) """ - h = sdr.design_frac_delay_fir(30, 0.123456789) + h = sdr.fractional_delay_fir(30, 0.123456789) h_truth = np.array( [ 0.000087825242290, @@ -423,7 +423,7 @@ def test_0p876543211_10(): >> h = designFracDelayFIR(1 - 0.123456789, 10); >> transpose(h) """ - h = sdr.design_frac_delay_fir(10, 1 - 0.123456789) + h = sdr.fractional_delay_fir(10, 1 - 0.123456789) h_truth = np.array( [ 0.001829658288306, @@ -447,7 +447,7 @@ def test_0p876543211_21(): >> h = designFracDelayFIR(1 - 0.123456789, 21); >> transpose(h) """ - h = sdr.design_frac_delay_fir(21, 1 - 0.123456789) + h = sdr.fractional_delay_fir(21, 1 - 0.123456789) h_truth = np.array( [ -0.000230874758491, @@ -482,7 +482,7 @@ def test_0p876543211_30(): >> h = designFracDelayFIR(1 - 0.123456789, 30); >> transpose(h) """ - h = sdr.design_frac_delay_fir(30, 1 - 0.123456789) + h = sdr.fractional_delay_fir(30, 1 - 0.123456789) h_truth = np.array( [ 0.000083379317149, diff --git a/tests/probability/test_multiply_distribution.py b/tests/probability/test_multiply_distribution.py new file mode 100644 index 000000000..20371a819 --- /dev/null +++ b/tests/probability/test_multiply_distribution.py @@ -0,0 +1,60 @@ +import numpy as np +import scipy.stats + +import sdr + + +def test_normal_normal(): + X = scipy.stats.norm(loc=3, scale=0.5) + Y = scipy.stats.norm(loc=5, scale=1.5) + _verify(X, Y) + + +def test_rayleigh_rayleigh(): + X = scipy.stats.rayleigh(scale=1) + Y = scipy.stats.rayleigh(loc=1, scale=2) + _verify(X, Y) + + +def test_rician_rician(): + X = scipy.stats.rice(2) + Y = scipy.stats.rice(3) + _verify(X, Y) + + +def test_normal_rayleigh(): + X = scipy.stats.norm(loc=-1, scale=0.5) + Y = scipy.stats.rayleigh(loc=2, scale=1.5) + _verify(X, Y) + + +def test_rayleigh_rician(): + X = scipy.stats.rayleigh(scale=1) + Y = scipy.stats.rice(3) + _verify(X, Y) + + +def _verify(X, Y): + # Empirically compute the distribution + z = X.rvs(250_000) * Y.rvs(250_000) + hist, bins = np.histogram(z, bins=51, density=True) + x = bins[1:] - np.diff(bins) / 2 + + # Numerically compute the distribution, only do so over the histogram bins (for speed) + Z = sdr.multiply_distributions(X, Y, x) + + if False: + import matplotlib.pyplot as plt + + plt.figure() + plt.plot(x, X.pdf(x), label="X") + plt.plot(x, Y.pdf(x), label="Y") + plt.plot(x, Z.pdf(x), label="X * Y") + plt.hist(z, bins=51, cumulative=False, density=True, histtype="step", label="X * Y empirical") + plt.legend() + plt.xlabel("Random variable") + plt.ylabel("Probability density") + plt.title("Product of two distributions") + plt.show() + + assert np.allclose(Z.pdf(x), hist, atol=np.max(hist) * 0.2) diff --git a/tests/probability/test_sum_distribution.py b/tests/probability/test_sum_distribution.py new file mode 100644 index 000000000..c44263829 --- /dev/null +++ b/tests/probability/test_sum_distribution.py @@ -0,0 +1,50 @@ +import numpy as np +import scipy.stats + +import sdr + + +def test_normal(): + X = scipy.stats.norm(loc=-1, scale=0.5) + _verify(X, 2) + _verify(X, 3) + _verify(X, 4) + + +def test_rayleigh(): + X = scipy.stats.rayleigh(scale=1) + _verify(X, 2) + _verify(X, 3) + _verify(X, 4) + + +def test_rician(): + X = scipy.stats.rice(2) + _verify(X, 2) + _verify(X, 3) + _verify(X, 4) + + +def _verify(X, n): + # Numerically compute the distribution + Y = sdr.sum_distribution(X, n) + + # Empirically compute the distribution + y = X.rvs((100_000, n)).sum(axis=1) + hist, bins = np.histogram(y, bins=101, density=True) + x = bins[1:] - np.diff(bins) / 2 + + if False: + import matplotlib.pyplot as plt + + plt.figure() + plt.plot(x, X.pdf(x), label="X") + plt.plot(x, Y.pdf(x), label="X + ... + X") + plt.hist(y, bins=101, cumulative=False, density=True, histtype="step", label="X + .. + X empirical") + plt.legend() + plt.xlabel("Random variable") + plt.ylabel("Probability density") + plt.title("Sum of distribution") + plt.show() + + assert np.allclose(Y.pdf(x), hist, atol=np.max(hist) * 0.1) diff --git a/tests/probability/test_sum_distributions.py b/tests/probability/test_sum_distributions.py new file mode 100644 index 000000000..1cd536118 --- /dev/null +++ b/tests/probability/test_sum_distributions.py @@ -0,0 +1,60 @@ +import numpy as np +import scipy.stats + +import sdr + + +def test_normal_normal(): + X = scipy.stats.norm(loc=-1, scale=0.5) + Y = scipy.stats.norm(loc=2, scale=1.5) + _verify(X, Y) + + +def test_rayleigh_rayleigh(): + X = scipy.stats.rayleigh(scale=1) + Y = scipy.stats.rayleigh(loc=1, scale=2) + _verify(X, Y) + + +def test_rician_rician(): + X = scipy.stats.rice(2) + Y = scipy.stats.rice(3) + _verify(X, Y) + + +def test_normal_rayleigh(): + X = scipy.stats.norm(loc=-1, scale=0.5) + Y = scipy.stats.rayleigh(loc=2, scale=1.5) + _verify(X, Y) + + +def test_rayleigh_rician(): + X = scipy.stats.rayleigh(scale=1) + Y = scipy.stats.rice(3) + _verify(X, Y) + + +def _verify(X, Y): + # Numerically compute the distribution + Z = sdr.sum_distributions(X, Y) + + # Empirically compute the distribution + z = X.rvs(100_000) + Y.rvs(100_000) + hist, bins = np.histogram(z, bins=101, density=True) + x = bins[1:] - np.diff(bins) / 2 + + if False: + import matplotlib.pyplot as plt + + plt.figure() + plt.plot(x, X.pdf(x), label="X") + plt.plot(x, Y.pdf(x), label="Y") + plt.plot(x, Z.pdf(x), label="X + Y") + plt.hist(z, bins=101, cumulative=False, density=True, histtype="step", label="X + Y empirical") + plt.legend() + plt.xlabel("Random variable") + plt.ylabel("Probability density") + plt.title("Sum of two distributions") + plt.show() + + assert np.allclose(Z.pdf(x), hist, atol=np.max(hist) * 0.1)