Skip to content

Commit

Permalink
black formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
cboulay committed Oct 20, 2024
1 parent 0006008 commit a9529ff
Showing 1 changed file with 96 additions and 47 deletions.
143 changes: 96 additions & 47 deletions src/pycbsdk/cbhw/device/nsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class CBAnaInpOpts(IntEnum):
refelec_rawstream = 0x00000040
refelec_offsetcorrect = 0x00000100


class CBAOutOpts(IntEnum):
AUDIO = 0x00000001
SCALE = 0x00000002
Expand Down Expand Up @@ -229,6 +230,7 @@ class CBAInpSpk(IntFlag):
PCASORT = PCAMANSORT | PCAAUTOSORT # All PCA sorting algorithms
ALLSORT = AUTOSORT | HOOPSORT | PCASORT # All sorting algorithms


class LNCRate:
lnc_rates = {
0: 0,
Expand All @@ -237,6 +239,7 @@ class LNCRate:
30: 30000,
60: 60000,
}

@staticmethod
def GetLNCRate(key) -> int:
try:
Expand Down Expand Up @@ -437,7 +440,6 @@ def _handle_chaninfo(self, pkt):
# TODO: from CHANREPSPKTHR, .spkthrlevel
self._config["channel_infos"][pkt.chan].spkthrlevel = pkt.spkthrlevel


else:
# TODO: from CHANREPNTRODEGROUP, .spkgroup
# TODO: from CHANREPDISP, .smpdispmin, .smpdispmax, .spkdispmax, .lncdispmax
Expand Down Expand Up @@ -579,10 +581,14 @@ def _toggle_channel_ainp_flag(
if not self._send_packet(pkt=pkt, event=event, timeout=timeout):
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if (pkt.ainpopts != self._config["channel_infos"][chid].ainpopts):
raise RuntimeError("Packet response contents do not match expected values.")
if pkt.ainpopts != self._config["channel_infos"][chid].ainpopts:
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")
raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_smpgroup(
self, chid: int, attr_value: int, timeout: float = 0
Expand Down Expand Up @@ -611,14 +617,16 @@ def _configure_channel_smpgroup(
if not self._send_packet(pkt=pkt, event=event, timeout=timeout):
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if (
pkt.smpgroup != self._config["channel_infos"][chid].smpgroup
) or (
pkt.smpfilter != self._config["channel_infos"][chid].smpfilter
):
raise RuntimeError("Packet response contents do not match expected values.")
if (pkt.smpgroup != self._config["channel_infos"][chid].smpgroup) or (
pkt.smpfilter != self._config["channel_infos"][chid].smpfilter
):
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")
raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_autothreshold(
self, chid: int, attr_value: int, timeout: float = 0.0
Expand Down Expand Up @@ -677,19 +685,21 @@ def _configure_channel_hoops(self, chid: int, attr_value: dict, timeout: float =
def _configure_channel_label(self, chid: int, attr_value: str, timeout: float = 0):
pkt = copy.copy(self._config["channel_infos"][chid])
pkt.header.type = CBPacketType.CHANSETLABEL
pkt.label = bytes(create_string_buffer(attr_value.encode('utf-8'), 16))
pkt.label = bytes(create_string_buffer(attr_value.encode("utf-8"), 16))
# TODO: pkt.userflags
# TODO: pkt.position
event = self._config_events["chaninfo"] if timeout > 0 else None
if not self._send_packet(pkt=pkt, event=event, timeout=timeout):
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.label != self._config["channel_infos"][chid].label:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")


raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_lnc(self, chid: int, attr_value: int, timeout: float = 0):
pkt = copy.copy(self._config["channel_infos"][chid])
Expand All @@ -704,11 +714,17 @@ def _configure_channel_lnc(self, chid: int, attr_value: int, timeout: float = 0)
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.ainpopts != self._config["channel_infos"][chid].ainpopts:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")
raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_lnc_rate(self, chid: int, attr_value: int, timeout: float) -> None:
def _configure_channel_lnc_rate(
self, chid: int, attr_value: int, timeout: float
) -> None:
pkt = copy.copy(self._config["channel_infos"][chid])

pkt.lncrate = LNCRate.GetLNCRate(attr_value)
Expand All @@ -720,9 +736,13 @@ def _configure_channel_lnc_rate(self, chid: int, attr_value: int, timeout: float
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.lncrate != self._config["channel_infos"][chid].lncrate:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")
raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _set_lnc_global_config(
self, chid: int, attr_value: int = 60, timeout: float = 0
Expand All @@ -744,17 +764,19 @@ def _set_lnc_global_config(

if (
(pkt.lncFreq != self._config["channel_infos"][chid].lncFreq)
or (
pkt.lncRefChan != self._config["channel_infos"][chid].lncRefChan
)
or (pkt.lncRefChan != self._config["channel_infos"][chid].lncRefChan)
or (
pkt.lncGlobalMode
!= self._config["channel_infos"][chid].lncGlobalMode
)
):
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")
raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_dcoffset(
self, chid: int, attr_value: bool, timeout: float = 0.0
Expand All @@ -765,9 +787,9 @@ def _configure_channel_dcoffset(
enable=not not attr_value,
timeout=timeout,
)

def _configure_channel_spkfilter(
self, chid: int, attr_value: int, timeout: float = 0.0
self, chid: int, attr_value: int, timeout: float = 0.0
):
pkt = copy.copy(self._config["channel_infos"][chid])
pkt.spkfilter = attr_value
Expand All @@ -778,11 +800,17 @@ def _configure_channel_spkfilter(
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.spkfilter != self._config["channel_infos"][chid].spkfilter:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")

def _configure_spk_threshold(self, chid: int, attr_value: int, timeout: float = 0.0):
raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_spk_threshold(
self, chid: int, attr_value: int, timeout: float = 0.0
):
pkt = copy.copy(self._config["channel_infos"][chid])
pkt.spkthrlevel = attr_value
pkt.header.type = CBPacketType.CHANSETSPKTHR
Expand All @@ -793,13 +821,16 @@ def _configure_spk_threshold(self, chid: int, attr_value: int, timeout: float =
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.spkthrlevel != self._config["channel_infos"][chid].spkthrlevel:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")

raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_analogout(
self, chid: int, attr_value: int, timeout: float = 0.0
self, chid: int, attr_value: int, timeout: float = 0.0
):
pkt = copy.copy(self._config["channel_infos"][chid])
pkt.aoutopts = attr_value
Expand All @@ -810,10 +841,14 @@ def _configure_channel_analogout(
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.aoutopts != self._config["channel_infos"][chid].aoutopts:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")

raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_digital_input(
self, chid: int, attr_value: int, timeout: float = 0.0
):
Expand All @@ -826,10 +861,14 @@ def _configure_channel_digital_input(
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.dinpopts != self._config["channel_infos"][chid].dinpopts:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")

raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_digital_output(
self, chid: int, attr_value: int, timeout: float = 0.0
):
Expand All @@ -842,9 +881,13 @@ def _configure_channel_digital_output(
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.doutopts != self._config["channel_infos"][chid].doutopts:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")
raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_smpfilter(
self, chid: int, attr_value: int, timeout: float = 0.0
Expand All @@ -858,9 +901,13 @@ def _configure_channel_smpfilter(
self.get_config(timeout=GET_CONFIG_TIMEOUT, force_refresh=True)

if pkt.smpfilter != self._config["channel_infos"][chid].smpfilter:
raise RuntimeError("Packet response contents do not match expected values.")
raise RuntimeError(
"Packet response contents do not match expected values."
)
else:
raise RuntimeError("Valid packet response NOT received, but packet contains expected values")
raise RuntimeError(
"Valid packet response NOT received, but packet contains expected values"
)

def _configure_channel_enable_spike(
self, chid: int, attr_value: bool, timeout: float = 0
Expand Down Expand Up @@ -899,7 +946,9 @@ def configure_channel(
# so let the user know, TODO: raise an exception?
print(f"{attr_name} is not a recognized name.")

def configure_all_channels(self, chtype: CBChannelType, attr_name: str, attr_value, timeout: float):
def configure_all_channels(
self, chtype: CBChannelType, attr_name: str, attr_value, timeout: float
):
for chid, ch_info in self._config["channel_infos"].items():
if self._config["channel_types"][chid] == chtype:
self.configure_channel(chid, attr_name, attr_value, timeout)
Expand Down

0 comments on commit a9529ff

Please sign in to comment.