Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenEphysBinaryRawIO: Separate Neural and Non-Neural Data into Distinct Streams #1624

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
135 changes: 111 additions & 24 deletions neo/rawio/openephysbinaryrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def _parse_header(self):
else:
event_stream_names = []

self._num_of_signal_streams = len(sig_stream_names)

# first loop to reassign stream by "stream_index" instead of "stream_name"
self._sig_streams = {}
self._evt_streams = {}
Expand All @@ -121,45 +123,75 @@ def _parse_header(self):
# signals zone
# create signals channel map: several channel per stream
signal_channels = []

for stream_index, stream_name in enumerate(sig_stream_names):
# stream_index is the index in vector sytream names
# stream_index is the index in vector stream names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

stream_id = str(stream_index)
buffer_id = stream_id
info = self._sig_streams[0][0][stream_index]
new_channels = []
for chan_info in info["channels"]:
chan_id = chan_info["channel_name"]

units = chan_info["units"]
if units == "":
# When units are not provided they are microvolts for neural channels and volts for ADC channels
# See https://open-ephys.github.io/gui-docs/User-Manual/Recording-data/Binary-format.html#continuous
units = "uV" if "ADC" not in chan_id else "V"

# Special cases for stream
if "SYNC" in chan_id and not self.load_sync_channel:
# the channel is removed from stream but not the buffer
stream_id = ""
if chan_info["units"] == "":
# in some cases for some OE version the unit is "", but the gain is to "uV"
units = "uV"
else:
units = chan_info["units"]

if "ADC" in chan_id:
# These are non-neural channels and their stream should be separated
# We defined their stream_id as the stream_index of neural data plus the number of neural streams
# This is to not break backwards compatbility with the stream_id numbering
stream_id = str(stream_index + len(sig_stream_names))

gain = float(chan_info["bit_volts"])
sampling_rate = float(info["sample_rate"])
offset = 0.0
new_channels.append(
(
chan_info["channel_name"],
chan_id,
float(info["sample_rate"]),
sampling_rate,
info["dtype"],
units,
chan_info["bit_volts"],
0.0,
gain,
offset,
stream_id,
buffer_id,
)
)
signal_channels.extend(new_channels)

signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype)

signal_streams = []
signal_buffers = []
for stream_index, stream_name in enumerate(sig_stream_names):
stream_id = str(stream_index)
buffer_id = str(stream_index)
signal_buffers.append((stream_name, buffer_id))

unique_streams_ids = np.unique(signal_channels["stream_id"])
for stream_id in unique_streams_ids:
# Handle special case of Synch channel having stream_id empty
if stream_id == "":
continue
stream_index = int(stream_id)
# Neural signal
if stream_index < self._num_of_signal_streams:
stream_name = sig_stream_names[stream_index]
buffer_id = stream_id
# We add the buffers here as both the neural and the ADC channels are in the same buffer
signal_buffers.append((stream_name, buffer_id))
else: # This names the ADC streams
neural_stream_index = stream_index - self._num_of_signal_streams
neural_stream_name = sig_stream_names[neural_stream_index]
stream_name = f"{neural_stream_name}_ADC"
buffer_id = str(neural_stream_index)
signal_streams.append((stream_name, stream_id, buffer_id))

signal_streams = np.array(signal_streams, dtype=_signal_stream_dtype)
signal_buffers = np.array(signal_buffers, dtype=_signal_buffer_dtype)

Expand Down Expand Up @@ -192,10 +224,44 @@ def _parse_header(self):
"SYNC channel is not present in the recording. " "Set load_sync_channel to False"
)

if has_sync_trace and not self.load_sync_channel:
self._stream_buffer_slice[stream_id] = slice(None, -1)
# Check if ADC and non-ADC channels are contiguous
is_channel_adc = ["ADC" in ch["channel_name"] for ch in info["channels"]]
first_adc_index = is_channel_adc.index(True) if any(is_channel_adc) else len(is_channel_adc)
non_adc_channels_after_adc_channels = [not flag for flag in is_channel_adc[first_adc_index:]]
if any(non_adc_channels_after_adc_channels):
raise ValueError(
"Interleaved ADC and non-ADC channels are not supported. "
"ADC channels must be contiguous. Open an issue in python-neo to request this feature."
)

# Find sync channel and verify it's the last channel
sync_index = next(
(index for index, ch in enumerate(info["channels"]) if ch["channel_name"].endswith("_SYNC")),
None,
)
if sync_index is not None and sync_index != num_channels - 1:
raise ValueError(
"SYNC channel must be the last channel in the buffer. Open an issue in python-neo to request this feature."
zm711 marked this conversation as resolved.
Show resolved Hide resolved
)

num_neural_channels = sum(1 for ch_info in info["channels"] if "ADC" not in ch_info["channel_name"])
num_non_neural_channels = sum(1 for ch_info in info["channels"] if "ADC" in ch_info["channel_name"])

if num_non_neural_channels == 0:
if has_sync_trace and not self.load_sync_channel:
self._stream_buffer_slice[stream_id] = slice(None, -1)
else:
self._stream_buffer_slice[stream_id] = None
else:
self._stream_buffer_slice[stream_id] = None
stream_id_neural = stream_id
stream_id_non_neural = str(int(stream_id) + self._num_of_signal_streams)

self._stream_buffer_slice[stream_id_neural] = slice(0, num_neural_channels)

if has_sync_trace and not self.load_sync_channel:
self._stream_buffer_slice[stream_id_non_neural] = slice(num_neural_channels, -1)
else:
self._stream_buffer_slice[stream_id_non_neural] = slice(num_neural_channels, None)

# events zone
# channel map: one channel one stream
Expand Down Expand Up @@ -377,17 +443,33 @@ def _parse_header(self):
seg_ann = bl_ann["segments"][seg_index]

# array annotations for signal channels
for stream_index, stream_name in enumerate(sig_stream_names):
for stream_index, stream_name in enumerate(self.header["signal_streams"]["name"]):
sig_ann = seg_ann["signals"][stream_index]
info = self._sig_streams[block_index][seg_index][stream_index]
has_sync_trace = self._sig_streams[block_index][seg_index][stream_index]["has_sync_trace"]
if stream_index < self._num_of_signal_streams:
_sig_stream_index = stream_index
is_neural_stream = True
else:
_sig_stream_index = stream_index - self._num_of_signal_streams
is_neural_stream = False
info = self._sig_streams[block_index][seg_index][_sig_stream_index]
has_sync_trace = self._sig_streams[block_index][seg_index][_sig_stream_index]["has_sync_trace"]

for key in ("identifier", "history", "source_processor_index", "recorded_processor_index"):
if key in info["channels"][0]:
values = np.array([chan_info[key] for chan_info in info["channels"]])

for k in ("identifier", "history", "source_processor_index", "recorded_processor_index"):
if k in info["channels"][0]:
values = np.array([chan_info[k] for chan_info in info["channels"]])
if has_sync_trace:
values = values[:-1]
sig_ann["__array_annotations__"][k] = values

num_neural_channels = sum(
1 for ch_info in info["channels"] if "ADC" not in ch_info["channel_name"]
)
if is_neural_stream:
values = values[:num_neural_channels]
else:
values = values[num_neural_channels:]

sig_ann["__array_annotations__"][key] = values

# array annotations for event channels
# use other possible data in _possible_event_stream_names
Expand Down Expand Up @@ -431,7 +513,12 @@ def _channels_to_group_id(self, channel_indexes):
return group_id

def _get_signal_t_start(self, block_index, seg_index, stream_index):
t_start = self._sig_streams[block_index][seg_index][stream_index]["t_start"]
if stream_index < self._num_of_signal_streams:
_sig_stream_index = stream_index
else:
_sig_stream_index = stream_index - self._num_of_signal_streams

t_start = self._sig_streams[block_index][seg_index][_sig_stream_index]["t_start"]
return t_start

def _spike_count(self, block_index, seg_index, unit_index):
Expand Down
11 changes: 11 additions & 0 deletions neo/test/rawiotest/test_openephysbinaryrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TestOpenEphysBinaryRawIO(BaseTestRawIO, unittest.TestCase):
"openephysbinary/v0.6.x_neuropixels_multiexp_multistream",
"openephysbinary/v0.6.x_neuropixels_with_sync",
"openephysbinary/v0.6.x_neuropixels_missing_folders",
"openephysbinary/neural_and_non_neural_data_mixed"
]

def test_sync(self):
Expand Down Expand Up @@ -78,6 +79,16 @@ def test_multiple_ttl_events_parsing(self):
assert np.allclose(ttl_events["durations"][ttl_events["labels"] == "6"], 0.025, atol=0.001)
assert np.allclose(ttl_events["durations"][ttl_events["labels"] == "7"], 0.016666, atol=0.001)

def test_separating_stream_for_non_neural_data(self):
rawio = OpenEphysBinaryRawIO(
self.get_local_path("openephysbinary/neural_and_non_neural_data_mixed"), load_sync_channel=False
)
rawio.parse_header()
# Check that the non-neural data stream is correctly separated
assert len(rawio.header["signal_streams"]["name"]) == 2
assert rawio.header["signal_streams"]["name"].tolist() == ["Rhythm_FPGA-100.0", "Rhythm_FPGA-100.0_ADC"]



if __name__ == "__main__":
unittest.main()
Loading