-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add unit test to capture-stop some streams #883
base: main
Are you sure you want to change the base?
Conversation
Also tweaked the _send_data helper and how it returned X- and B-engine results for incorporation of this test. Contributes to: NGC-1155.
And refine the test itself. Also fix verify_beam_sensors after the previous _send_data update. Contributes to: NGC-1155.
Turns out it doesn't need to be around at all for the newest xbgpu unit test. Contributes to: NGC-1155.
I imported it to use in the end-to-end test, figured I'd use it in the new test as well. Contributes to: NGC-1155.
except spead2.Stopped: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not overjoyed with my deadpan handling of the Ringbuffer Stopped exception. I'm going to defer to the expert on any suggestions. I'll be sure to push something if/when I think of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also not wild about it. My main worry is that if something causes a Stopped exception in one of the other tests, you're going to have zeros in the results and you'll get a data mismatch, which will be misleading and make the failure harder to debug.
My inclination would be to pass extra information into send_data to indicate when it should expect to run out of data. It can then directly verify that this is actually when data stops flowing: after receiving all the expected data, it could try to receive another heap inside a pytest.raises
context manager (which might still need to handle descriptors).
It might also be nice to then size the results array to the amount of data actually expected, instead of returning arrays with trailing zeros. That's actually what I thought you were going to do when I saw you'd replaced a big array with a dictionary of arrays. I realise this could have knock-on implications for all the verification functions though, so if it's going to be big pain then don't bother.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realise this could have knock-on implications for all the verification functions though, so if it's going to be big pain then don't bother.
Your proposed suggestion is exactly what I intended on doing. Stopped short of that because I didn't want my changes to be too wholesale. Still, I see it's tending that way anyway even with just a "expect data to stop here" parameter. Might as well go all in.
# lists of two different types could be added together. | ||
all_stream_names = list(map(lambda output: output.name, chain(corrprod_outputs, beam_outputs))) | ||
rng = np.random.default_rng(seed=1) | ||
# Select random subset of output data products to issue `?capture-stop` to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last part of the comment seems to have gotten lost somewhere...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine to me, if a little ungrammatical.
@@ -446,7 +446,7 @@ def verify_corrprod_sensors( | |||
|
|||
def verify_beam_data( | |||
beam_outputs: list[BOutput], | |||
beam_results: np.ndarray, | |||
beam_results: list[np.ndarray], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update the docstring to match.
List of arrays of all GPU-generated data. One output array per | ||
corrprod_output, where each array has shape | ||
Dictionary of arrays of all XPipeline output. Each key is the | ||
corrprod_output name, each value is an array with shape |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comma splice
Beamformer output, with shape (n_beams, n_batches, | ||
n_channels_per_substream, n_spectra_per_heap, COMPLEX). | ||
Dictionary of arrays of all BPipeline output. Each key is the | ||
beam_output name, each value is an array with shape |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comma splice
# Naive list concatention using '+' not used below as mypy was not happy | ||
# lists of two different types could be added together. | ||
all_stream_names = list(map(lambda output: output.name, chain(corrprod_outputs, beam_outputs))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really? Mypy seems happy with the following for me:
# Naive list concatention using '+' not used below as mypy was not happy | |
# lists of two different types could be added together. | |
all_stream_names = list(map(lambda output: output.name, chain(corrprod_outputs, beam_outputs))) | |
# Naive list concatention using '+' not used below as mypy was not happy | |
# lists of two different types could be added together. | |
all_stream_names = [output.name for output in corrprod_outputs + beam_outputs] |
# lists of two different types could be added together. | ||
all_stream_names = list(map(lambda output: output.name, chain(corrprod_outputs, beam_outputs))) | ||
rng = np.random.default_rng(seed=1) | ||
# Select random subset of output data products to issue `?capture-stop` to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine to me, if a little ungrammatical.
# the accumulation index where data stopped transmitting. | ||
np.testing.assert_equal( | ||
corrprod_results[stopped_stream][ | ||
capture_stop_chunk_id * HEAPS_PER_FENGINE_PER_CHUNK // heap_accumulation_threshold[0] : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line wrapping makes this whole statement hard to read. I'd suggest pulling the calculation into a separate statement (assigning it to a variable).
|
||
for captured_stream in all_stream_names: | ||
if captured_stream in corrprod_results.keys(): | ||
# NOTE: The `heap_factory` only sends real data. The results buffer has |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Um, it looks to me like it's sending 10+10j. Perhaps you mean that the correlations that result are purely real?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, apologies, I'm referring to the results. I noticed that when inspecting the corrprod_results
ndarray.
heap = await stream.get() | ||
items = ig_recv.update(heap) | ||
assert ( | ||
len(list(items.values())) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realise you haven't actually changed this code, just reflowed it, but I think this could just be
len(list(items.values())) == 0 | |
len(items) == 0 |
That would probably allow it to go back to being a single line instead of awkwardly wrapped.
@@ -1201,7 +1212,7 @@ def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.Heap | |||
|
|||
expected_beam_saturated_low, expected_beam_saturated_high = verify_beam_data( | |||
beam_outputs=beam_outputs, | |||
beam_results=beam_results, | |||
beam_results=list(beam_results.values()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me slightly nervous: it depends on the iteration order of the beam_results dictionary matching the order of beam_outputs
. That is guaranteed with the way you've written the code, but it's an implicit assumption that someone might break then spend time debugging. Some alternative ideas:
beam_results=list(beam_results.values()), | |
beam_results=[beam_results[output] for output in beam_outputs], |
- Change the parameter in verify_beam_data to a dictionary.
- Add a comment where
beam_results
is defined to indicate that the iteration order is significant.
Some comment applies to the verify_corrprod_data call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted all around and appreciate you flagging this. Must admit, I had similar concerns, but hadn't considered this particular case.
Regarding
- Change the parameter in verify_beam_data to a dictionary
I'm not quite seeing why that needs to be done if you've suggested list comprehension for the beam_results
values?
- That is,
[my_dict[this_key] for this_key in my_keys]
just results in a list of values (at least in how I've tested the logic in an ipython session)
EDIT: I know your suggestion is shorthand, but I have to add
beam_outputs
is typelist[BOutput]
, so that would be[beam_results[output.name] for output in ...]
- If that affects your suggestion at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant do one of those things, not all of them.
except spead2.Stopped: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also not wild about it. My main worry is that if something causes a Stopped exception in one of the other tests, you're going to have zeros in the results and you'll get a data mismatch, which will be misleading and make the failure harder to debug.
My inclination would be to pass extra information into send_data to indicate when it should expect to run out of data. It can then directly verify that this is actually when data stops flowing: after receiving all the expected data, it could try to receive another heap inside a pytest.raises
context manager (which might still need to handle descriptors).
It might also be nice to then size the results array to the amount of data actually expected, instead of returning arrays with trailing zeros. That's actually what I thought you were going to do when I saw you'd replaced a big array with a dictionary of arrays. I realise this could have knock-on implications for all the verification functions though, so if it's going to be big pain then don't bother.
Randomly select data streams in the XB-engine to issue a
?capture-stop
request to atsome point during data processing. Verify that those streams stopped only have data
up to the
capture-stop
point, and other streams have processed and transmitted datauninterrupted.
Checklist (if not applicable, edit to add
(N/A)
and mark as done):setup.cfg
and.pre-commit-config.yaml
sphinx-apidoc -efo doc/ src/
to update files indoc/
fake_servers.py
in katsdpcontroller to matchCloses NGC-1155.