Skip to content

Commit

Permalink
Merge branch '221-snr-upgrades' into 'master'
Browse files Browse the repository at this point in the history
Propagation Loss SNR Calculations

Closes #221

See merge request barkhauseninstitut/wicon/hermespy!191
  • Loading branch information
adlerjan committed May 28, 2024
2 parents 5f8e42c + 0483322 commit 30d63e3
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 67 deletions.
4 changes: 4 additions & 0 deletions hermespy/channel/cdl/cluster_delay_lines.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ def delay_offset(self) -> float:

return self.__delay_offset

@property
def expected_energy_scale(self) -> float:
return float(np.sum(self.cluster_powers))

def __ray_impulse_generator(
self, sampling_rate: float, num_samples: int, center_frequency: float
) -> Generator[Tuple[np.ndarray, np.ndarray, float], None, None]:
Expand Down
13 changes: 11 additions & 2 deletions hermespy/channel/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,16 @@ def time(self) -> float:

return self.__state.time

@property
@abstractmethod
def expected_energy_scale(self) -> float:
"""Expected linear scaling of a propagated signal's energy at each receiving antenna.
Required to compute the expected energy of a signal after propagation,
and therfore signal-to-noise ratios (SNRs) and signal-to-interference-plus-noise ratios (SINRs).
"""
... # pragma: no cover

@abstractmethod
def _propagate(self, signal: SignalBlock, interpolation: InterpolationMode) -> SignalBlock:
"""Propagate radio-frequency band signals over a channel instance.
Expand Down Expand Up @@ -647,15 +657,14 @@ class Channel(ABC, RandomNode, Serializable, Generic[CRT, CST]):

__scenario: SimulationScenario
__gain: float
__interpolation_mode: InterpolationMode
__sample_hooks: Set[ChannelSampleHook[CST]]

def __init__(self, gain: float = 1.0, seed: Optional[int] = None) -> None:
"""
Args:
gain (float, optional):
Linear channel power gain factor.
Linear channel energy gain factor.
Initializes the :meth:`gain<gain>` property.
:math:`1.0` by default.
Expand Down
4 changes: 4 additions & 0 deletions hermespy/channel/delay/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def gain(self) -> float:

return self.__gain

@property
def expected_energy_scale(self) -> float:
return self.__path_gain(self.carrier_frequency)

def __spatial_response(self) -> np.ndarray:

receiver_position = self.receiver_state.position
Expand Down
39 changes: 19 additions & 20 deletions hermespy/channel/fading/fading.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,14 @@ def spatial_response(self) -> np.ndarray:

@property
def gain(self) -> float:
"""Linear power gain factor a signal experiences when being propagated over this realization."""
"""Linear energy gain factor a signal experiences when being propagated over this realization."""

return self.__gain

@property
def expected_energy_scale(self) -> float:
return self.gain * np.sum(self.power_profile)

def __path_impulse_generator(
self, num_samples: int
) -> Generator[Tuple[np.ndarray, int], None, None]:
Expand Down Expand Up @@ -311,27 +315,22 @@ def __path_impulse_generator(
self.nlos_phases,
):

impulse = (
nlos_gain
* (num_sinusoids**-0.5)
* np.sum(
exp(
1j
* np.outer(
nlos_time,
np.cos((2 * pi * n + nlos_angles + nlos_phases) / num_sinusoids),
)
),
axis=1,
keepdims=False,
)
impulse = nlos_gain * np.sum(
np.exp(
1j
* (
np.outer(nlos_time, np.cos((2 * pi * n + nlos_angles) / num_sinusoids))
+ nlos_phases[None, :]
)
),
axis=1,
keepdims=False,
)

# Add the specular component
impulse += los_gain * exp(1j * (los_time * cos(los_angle) + los_phase))

# Scale by the overall path power
# ToDo: Check if the normalization by the number of paths is correct
impulse *= (self.gain * power) ** 0.5
yield impulse, delay

Expand Down Expand Up @@ -475,9 +474,9 @@ def _sample(self, state: LinkState) -> MultipathFadingSample:
if isinstance(self.__los_angles_variable, float)
else 2 * np.pi * self.__los_angles_variable.sample(consistent_sample)
)
nlos_angles = 2 * np.pi * self.__path_angles_variable.sample(consistent_sample)
los_phases = 2 * np.pi * self.__los_phases_variable.sample(consistent_sample)
nlos_phases = 2 * np.pi * self.__path_phases_variable.sample(consistent_sample)
nlos_angles = -np.pi + 2 * np.pi * self.__path_angles_variable.sample(consistent_sample)
los_phases = -np.pi + 2 * np.pi * self.__los_phases_variable.sample(consistent_sample)
nlos_phases = -np.pi + 2 * np.pi * self.__path_phases_variable.sample(consistent_sample)

return MultipathFadingSample(
self.__power_profile,
Expand Down Expand Up @@ -693,7 +692,7 @@ def __init__(
self.__power_profile = self.__power_profile[sorting]
self.__rice_factors = self.__rice_factors[sorting]
self.__num_sinusoids = 20 if num_sinusoids is None else num_sinusoids
self.los_angle = self._rng.uniform(0, 2 * pi) if los_angle is None else los_angle
self.los_angle = self._rng.uniform(-pi, pi) if los_angle is None else los_angle
self.doppler_frequency = 0.0 if doppler_frequency is None else doppler_frequency
self.__los_doppler_frequency = None

Expand Down
4 changes: 4 additions & 0 deletions hermespy/channel/ideal.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def __init__(self, gain: float, state: LinkState) -> None:
# Initialize class attributes
self.__gain = gain

@property
def expected_energy_scale(self) -> float:
return self.__gain

def state(
self,
num_samples: int,
Expand Down
4 changes: 4 additions & 0 deletions hermespy/channel/quadriga/quadriga.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def path_delays(self) -> np.ndarray:

return self.__path_delays

@property
def expected_energy_scale(self) -> float:
return self.__gain * float(np.sum(self.__path_gains))

def _propagate(self, signal: SignalBlock, interpolation: InterpolationMode) -> SignalBlock:
max_delay_in_samples = int(np.round(np.max(self.path_delays) * self.bandwidth))
propagated_signal = np.zeros(
Expand Down
4 changes: 4 additions & 0 deletions hermespy/channel/radar/radar.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def gain(self) -> float:

return self.__gain

@property
def expected_energy_scale(self) -> float:
return self.__gain # ToDo: Consider the energy scale of the paths

def _propagate(self, signal: SignalBlock, interpolation: InterpolationMode) -> SignalBlock:
delays = np.array(
[
Expand Down
1 change: 0 additions & 1 deletion hermespy/modem/waveform_single_carrier.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,6 @@ def _base_filter(self) -> np.ndarray:
)
)
impulse_response[idx_0_by_0] = 1 + self.roll_off * (4 / np.pi - 1)

return impulse_response / np.linalg.norm(impulse_response)


Expand Down
32 changes: 28 additions & 4 deletions hermespy/simulation/noise/level.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from abc import abstractmethod

from hermespy.core import Device, ScalarDimension, Transmitter, Receiver, Serializable
from hermespy.channel import Channel, ChannelSample

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
Expand Down Expand Up @@ -132,12 +133,22 @@ class SNR(NoiseLevel):
yaml_tag = "SNR"
__snr: float
__reference: Device | Transmitter | Receiver
__expected_channel_scale: float

def __init__(self, snr: float, reference: Device | Transmitter | Receiver) -> None:
def __init__(
self, snr: float, reference: Device | Transmitter | Receiver, channel: Channel | None = None
) -> None:
"""
Args:
snr (float): Signal-to-noise ratio.
reference (Device |Transmitter | Receiver): Reference of the noise level.
snr (float):
Expected signal-to-noise ratio.
reference (Device |Transmitter | Receiver):
Reference of the noise level, i.e. with which power / energy was the signal generated.
channel (Channel, optional):
Channel instance over which the signal was propagated.
For channel models that consider propagation losses the noise power is scaled accordingly.
"""

# Initialize base class
Expand All @@ -146,6 +157,19 @@ def __init__(self, snr: float, reference: Device | Transmitter | Receiver) -> No
# Initialize class attributes
self.snr = snr
self.reference = reference
self.__expected_channel_scale = 1.0

if channel is not None:
channel.add_sample_hook(self.__update_expected_channel_scale)

def __update_expected_channel_scale(self, sample: ChannelSample) -> None:
"""Update the expected channel scale.
Args:
sample (ChannelSample): Channel sample.
"""

self.__expected_channel_scale = sample.expected_energy_scale

@property
def level(self) -> float:
Expand All @@ -162,7 +186,7 @@ def level(self, value: float) -> None:
self.snr = value

def get_power(self) -> float:
return self.reference.power / self.snr
return self.reference.power / self.snr * self.__expected_channel_scale

@property
def title(self) -> str:
Expand Down
45 changes: 27 additions & 18 deletions tests/integration_tests/test_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest import TestCase

import numpy as np
from numpy.testing import assert_array_almost_equal

from hermespy.channel import Channel, IdealChannel, TDL, Cost259, RandomDelayChannel, TDLType, UrbanMicrocells
from hermespy.core import Transformation
Expand Down Expand Up @@ -90,20 +91,20 @@ def __propagate(self, channel: Channel) -> None:
channel.seed = 42
self.link.seed = 42

device_transmission = self.tx_device.transmit()

self.device_transmission = self.tx_device.transmit()
channel_realization = channel.realize()
channel_sample = channel_realization.sample(self.tx_device, self.rx_device)
channel_propagation = channel_sample.propagate(device_transmission)
self.channel_sample = channel_realization.sample(self.tx_device, self.rx_device)
channel_propagation = self.channel_sample.propagate(self.device_transmission)
self.rx_device.process_input(channel_propagation)
link_reception = self.link.receive()
self.link_reception = self.link.receive()

# Debug:
#
link_transmission = device_transmission.operator_transmissions[0]
tx = link_transmission.signal
state = channel_sample.state(tx.num_samples, tx.num_samples)
rx_prediction = state.propagate(tx)
return
# link_transmission = device_transmission.operator_transmissions[0]
# tx = link_transmission.signal
# state = self.channel_sample.state(tx.num_samples, tx.num_samples)
# rx_prediction = state.propagate(tx)

# =======================
# Waveform configurations
Expand All @@ -115,7 +116,7 @@ def __configure_single_carrier_waveform(self, channel: Channel) -> RootRaisedCos
Returns: The configured waveform.
"""

waveform = RootRaisedCosineWaveform(symbol_rate=1 / 10e-6, num_preamble_symbols=10, num_data_symbols=40, pilot_rate=10, oversampling_factor=8, roll_off=0.9)
waveform = RootRaisedCosineWaveform(symbol_rate=1 / 10e-6, num_preamble_symbols=10, num_data_symbols=160, pilot_rate=10, oversampling_factor=8, roll_off=0.9)
waveform.synchronization = SingleCarrierCorrelationSynchronization()
waveform.channel_estimation = SingleCarrierIdealChannelEstimation(channel, self.tx_device, self.rx_device)
waveform.channel_equalization = SingleCarrierZeroForcingChannelEqualization()
Expand Down Expand Up @@ -282,7 +283,7 @@ def __configure_COST259_channel(self) -> Cost259:
Returns: The configured channel.
"""

channel = Cost259(gain=0.9, doppler_frequency=self._doppler_frequency)
channel = Cost259(gain=1.0, doppler_frequency=self._doppler_frequency, num_sinusoids=100)
return channel

def __configure_5GTDL_channel(self) -> TDL:
Expand Down Expand Up @@ -319,20 +320,28 @@ def __configure_delay_channel(self) -> RandomDelayChannel:
# =======================

def __assert_link(self) -> None:

# Test bit error rates
ber_treshold = 1e-2

# Test signal powers
# transmitted_power = self.device_transmission.emerging_signals[0].power
# expected_transmit_power = self.link.power

self.assertGreaterEqual(ber_treshold, self.ber.evaluate().artifact().to_scalar())
# assert_array_almost_equal(expected_transmit_power * np.ones_like(transmitted_power), transmitted_power, 1, "Transmitted waveform's power does not match expectations")

def test_ideal_channel_chirp_fsk(self) -> None:
"""Verify a valid SISO link over an ideal channel with chirp frequency shift keying modulation"""

self.__configure_chirp_fsk_waveform()
self.__propagate(IdealChannel())
self.__propagate(IdealChannel(.9))
self.__assert_link()

def test_ideal_channel_single_carrier(self) -> None:
"""Verify a valid SISO link over an ideal channel with single carrier modulation"""

channel = IdealChannel()
channel = IdealChannel(.9)
self.__configure_single_carrier_waveform(channel)
self.__propagate(channel)
self.__assert_link()
Expand All @@ -341,15 +350,15 @@ def test_ideal_channel_ocdm_ls_zf(self) -> None:
"""Verify a valid SISO link over an ideal channel with OCDM modulation,
least-squares channel estimation and zero-forcing equalization"""

channel = IdealChannel()
channel = IdealChannel(.9)
self.__configure_ocdm_waveform(channel)
self.__propagate(channel)
self.__assert_link()

def test_ideal_channel_ofdm(self) -> None:
"""Verify a valid SISO link over an ideal channel OFDM modulation"""

channel = IdealChannel()
channel = IdealChannel(.9)
self.__configure_ofdm_waveform(channel)
self.__propagate(channel)
self.__assert_link()
Expand All @@ -358,7 +367,7 @@ def test_ideal_channel_ofdm_ls_zf(self) -> None:
"""Verify a valid SISO link over an ideal channel with OFDM modulation,
least-squares channel estimation and zero-forcing equalization"""

channel = IdealChannel()
channel = IdealChannel(.9)
waveform = self.__configure_ofdm_waveform(channel)
waveform.channel_estimation = OrthogonalLeastSquaresChannelEstimation()
waveform.channel_equalization = OrthogonalZeroForcingChannelEqualization()
Expand All @@ -370,7 +379,7 @@ def test_ideal_channel_ofdm_schmidl_cox(self) -> None:
"""Verify a valid link over an AWGN channel with OFDM modluation,
Schmidl-Cox synchronization, least-squares channel estimation and zero-forcing equalization"""

channel = IdealChannel()
channel = IdealChannel(.9)
waveform = self.__configure_ofdm_waveform(channel)
waveform.pilot_section = SchmidlCoxPilotSection()
waveform.synchronization = SchmidlCoxSynchronization()
Expand All @@ -383,7 +392,7 @@ def test_ideal_channel_ofdm_schmidl_cox(self) -> None:
def test_ideal_channel_otfs_ls_zf(self) -> None:
"""Verify a valid SISO link over an ideal channel with OTFS modulation"""

channel = IdealChannel()
channel = IdealChannel(.9)
self.__configure_otfs_waveform(channel)
self.__propagate(channel)
self.__assert_link()
Expand Down
20 changes: 20 additions & 0 deletions tests/unit_tests/channel/test_cdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,26 @@ def test_expected_realization(self) -> None:
sample: ClusterDelayLineSample = realization.sample(self.alpha_device, self.beta_device)
self._test_propagation(sample)

def test_expected_scale(self) -> None:
"""Test the expected amplitude scaling"""

unit_energy_signal = DenseSignal(np.ones((self.alpha_device.num_transmit_antennas, 100)) / 10, self.bandwidth, self.carrier_frequency)
num_attempts = 100

cumulated_propagated_energy = np.zeros((self.beta_device.num_receive_antennas), dtype=np.float_)
cumulated_expected_scale = 0.0
for _ in range(num_attempts):
realization = self.model.realize()
sample = realization.sample(self.alpha_device, self.beta_device)
propagated_signal = sample.propagate(unit_energy_signal)
cumulated_propagated_energy += propagated_signal.energy
cumulated_expected_scale += sample.expected_energy_scale

mean_propagated_energy = cumulated_propagated_energy / num_attempts
mean_expected_energy = (cumulated_expected_scale / num_attempts) ** 2

self.assertAlmostEqual(mean_propagated_energy, mean_expected_energy, delta=1e-1)

def test_propagation_time_of_flight(self) -> None:
"""Test time of flight delay simulation"""

Expand Down
Loading

0 comments on commit 30d63e3

Please sign in to comment.