From e0cd770a5d87b97485c7ee09bf6c3abd134b5237 Mon Sep 17 00:00:00 2001 From: Amish Date: Fri, 8 Nov 2024 09:45:53 +0200 Subject: [PATCH 1/4] Add capture-stop unit test for some streams Also tweaked the _send_data helper and how it returned X- and B-engine results for incorporation of this test. Contributes to: NGC-1155. --- test/xbgpu/test_engine.py | 267 ++++++++++++++++++++++++++------------ 1 file changed, 187 insertions(+), 80 deletions(-) diff --git a/test/xbgpu/test_engine.py b/test/xbgpu/test_engine.py index f1802b61..ce440a7e 100644 --- a/test/xbgpu/test_engine.py +++ b/test/xbgpu/test_engine.py @@ -17,6 +17,7 @@ """Unit tests for XBEngine module.""" import asyncio +import logging from collections import Counter from collections.abc import AsyncGenerator, Callable, Iterable from itertools import chain @@ -48,6 +49,9 @@ from . import test_parameters from .test_recv import gen_heap +logger = logging.getLogger(__name__) +logging.basicConfig() + pytestmark = [pytest.mark.device_filter.with_args(device_filter)] get_baseline_index = njit(Correlation.get_baseline_index) @@ -446,7 +450,7 @@ def verify_corrprod_sensors( def verify_beam_data( beam_outputs: list[BOutput], - beam_results: np.ndarray, + beam_results: list[np.ndarray], present: np.ndarray, batch_indices: list[int], beam_params_change_batch_id: int, @@ -508,7 +512,7 @@ def verify_beam_data( # To keep it manageable, compare a batch at a time. for i in range(len(beam_outputs)): for j in range(len(batch_indices)): - np.testing.assert_allclose(expected_beams[i, j], beam_results[i, j], atol=1) + np.testing.assert_allclose(expected_beams[i, j], beam_results[i][j], atol=1) return expected_beam_saturated_low, expected_beam_saturated_high @@ -742,7 +746,7 @@ async def _send_data( n_channels_per_substream: int, frequency: int, n_spectra_per_heap: int, - ) -> tuple[list[np.ndarray], np.ndarray, list[list[int]], list[int]]: + ) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray], list[list[int]], list[int]]: """Send a stream of data to the engine and retrieve the results. Each full accumulation (for each corrprod-output) requires @@ -753,12 +757,8 @@ async def _send_data( Parameters ---------- - mock_recv_stream - Fixture - mock_send_stream - Fixture - corrprod_outputs - Fixture + mock_recv_stream, mock_send_stream, corrprod_outputs, beam_outputs + Unit test fixtures. heap_factory Callback to generate heaps. It is passed a batch index and a boolean array indicating which antennas are present for that @@ -774,12 +774,13 @@ async def _send_data( Returns ------- corrprod_results - List of arrays of all GPU-generated data. One output array per - corrprod_output, where each array has shape + Dictionary of arrays of all XPipeline output. Each key is the + corrprod_output name, each value is an array with shape (n_accumulations, n_channels_per_substream, n_baselines, COMPLEX). beam_results - Beamformer output, with shape (n_beams, n_batches, - n_channels_per_substream, n_spectra_per_heap, COMPLEX). + Dictionary of arrays of all BPipeline output. Each key is the + beam_output name, each value is an array with shape + (n_batches, n_channels_per_substream, n_spectra_per_heap, COMPLEX). acc_indices List of accumulation indices for each corrprod_output. batch_indices @@ -811,8 +812,8 @@ async def _send_data( queue.stop() n_baselines = n_ants * (n_ants + 1) * 2 - corrprod_results = [ - np.zeros( + corrprod_results = { + corrprod_output.name: np.zeros( shape=( len(acc_index_list), # n_accumulations for this XPipeline n_channels_per_substream, @@ -821,77 +822,96 @@ async def _send_data( ), dtype=np.int32, ) - for acc_index_list in acc_indices - ] + for corrprod_output, acc_index_list in zip(corrprod_outputs, acc_indices) + } out_config = spead2.recv.StreamConfig(max_heaps=100) out_tp = spead2.ThreadPool() + output_name = None for i, corrprod_output in enumerate(corrprod_outputs): - stream = spead2.recv.asyncio.Stream(out_tp, out_config) - stream.add_inproc_reader(mock_send_stream[i]) - # It is expected that the first packet will be a descriptor. - ig_recv = spead2.ItemGroup() - heap = await stream.get() - items = ig_recv.update(heap) - assert len(list(items.values())) == 0, "This heap contains item values not just the expected descriptors." - - for j, accumulation_index in enumerate(sorted(acc_indices[i])): - # Wait for heap to be ready and then update out item group - # with the new values. + try: + output_name = corrprod_output.name + stream = spead2.recv.asyncio.Stream(out_tp, out_config) + stream.add_inproc_reader(mock_send_stream[i]) + # It is expected that the first packet will be a descriptor. + ig_recv = spead2.ItemGroup() heap = await stream.get() + items = ig_recv.update(heap) + assert ( + len(list(items.values())) == 0 + ), "This heap contains item values not just the expected descriptors." - while (updated_items := set(ig_recv.update(heap))) == set(): - # Test has gone on long enough that we've received another descriptor + for j, accumulation_index in enumerate(sorted(acc_indices[i])): + # Wait for heap to be ready and then update out item group + # with the new values. heap = await stream.get() - assert updated_items == {"frequency", "timestamp", "xeng_raw"} - # Ensure that the timestamp from the heap is what we expect. - assert ( - ig_recv["timestamp"].value % (timestamp_step * corrprod_output.heap_accumulation_threshold) == 0 - ), "Output timestamp is not a multiple of timestamp_step * heap_accumulation_threshold." - assert ( - ig_recv["timestamp"].value - == accumulation_index * timestamp_step * corrprod_output.heap_accumulation_threshold - ), ( - "Output timestamp is not correct. " - f"Expected: " - f"{hex(accumulation_index * timestamp_step * corrprod_output.heap_accumulation_threshold)}, " - f"actual: {hex(ig_recv['timestamp'].value)}." - ) + while (updated_items := set(ig_recv.update(heap))) == set(): + # Test has gone on long enough that we've received another descriptor + heap = await stream.get() + assert updated_items == {"frequency", "timestamp", "xeng_raw"} + # Ensure that the timestamp from the heap is what we expect. + assert ( + ig_recv["timestamp"].value % (timestamp_step * corrprod_output.heap_accumulation_threshold) == 0 + ), "Output timestamp is not a multiple of timestamp_step * heap_accumulation_threshold." + + assert ( + ig_recv["timestamp"].value + == accumulation_index * timestamp_step * corrprod_output.heap_accumulation_threshold + ), ( + "Output timestamp is not correct. " + f"Expected: " + f"{hex(accumulation_index * timestamp_step * corrprod_output.heap_accumulation_threshold)}, " + f"actual: {hex(ig_recv['timestamp'].value)}." + ) - assert ig_recv["frequency"].value == frequency, ( - "Output channel offset not correct. " - f"Expected: {frequency}, " - f"actual: {ig_recv['frequency'].value}." - ) + assert ig_recv["frequency"].value == frequency, ( + "Output channel offset not correct. " + f"Expected: {frequency}, " + f"actual: {ig_recv['frequency'].value}." + ) - corrprod_results[i][j] = ig_recv["xeng_raw"].value + corrprod_results[corrprod_output.name][j] = ig_recv["xeng_raw"].value + except spead2.Stopped: + logger.info(f"Ringbuffer has stopped for {output_name}") + pass - beam_results = np.zeros( - (len(beam_outputs), len(batch_indices), n_channels_per_substream, n_spectra_per_heap, COMPLEX), - bsend.SEND_DTYPE, - ) - for i in range(len(beam_outputs)): - stream = spead2.recv.asyncio.Stream(out_tp, out_config) - stream.add_inproc_reader(mock_send_stream[i + len(corrprod_outputs)]) - # It is expected that the first packet will be a descriptor. - ig_recv = spead2.ItemGroup() - heap = await stream.get() - items = ig_recv.update(heap) - assert len(list(items.values())) == 0, "This heap contains item values not just the expected descriptors." - - for j, index in enumerate(batch_indices): + beam_results = { + beam_output.name: np.zeros( + (len(batch_indices), n_channels_per_substream, n_spectra_per_heap, COMPLEX), + bsend.SEND_DTYPE, + ) + for beam_output in beam_outputs + } + + for i, beam_output in enumerate(beam_outputs): + try: + output_name = beam_output.name + stream = spead2.recv.asyncio.Stream(out_tp, out_config) + stream.add_inproc_reader(mock_send_stream[i + len(corrprod_outputs)]) + # It is expected that the first packet will be a descriptor. + ig_recv = spead2.ItemGroup() heap = await stream.get() - while (updated_items := set(ig_recv.update(heap))) == set(): - # Test has gone on long enough that we've received another descriptor - heap = await stream.get() + items = ig_recv.update(heap) + assert ( + len(list(items.values())) == 0 + ), "This heap contains item values not just the expected descriptors." - assert updated_items == {"frequency", "timestamp", "beam_ants", "bf_raw"} - assert ig_recv["timestamp"].value == index * timestamp_step - assert ig_recv["frequency"].value == frequency - assert ig_recv["beam_ants"].value == np.sum(present[index]) - beam_results[i, j, ...] = ig_recv["bf_raw"].value + for j, index in enumerate(batch_indices): + heap = await stream.get() + while (updated_items := set(ig_recv.update(heap))) == set(): + # Test has gone on long enough that we've received another descriptor + heap = await stream.get() + + assert updated_items == {"frequency", "timestamp", "beam_ants", "bf_raw"} + assert ig_recv["timestamp"].value == index * timestamp_step + assert ig_recv["frequency"].value == frequency + assert ig_recv["beam_ants"].value == np.sum(present[index]) + beam_results[beam_output.name][j, ...] = ig_recv["bf_raw"].value + except spead2.Stopped: + logger.info(f"Ringbuffer has stopped for {output_name}") + pass return corrprod_results, beam_results, acc_indices, batch_indices @@ -1171,7 +1191,7 @@ def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.Heap verify_corrprod_data( corrprod_outputs=corrprod_outputs, - corrprod_results=corrprod_results, + corrprod_results=list(corrprod_results.values()), acc_indices=acc_indices, n_channels_per_substream=n_channels_per_substream, n_spectra_per_heap=n_spectra_per_heap, @@ -1201,7 +1221,7 @@ def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.Heap expected_beam_saturated_low, expected_beam_saturated_high = verify_beam_data( beam_outputs=beam_outputs, - beam_results=beam_results, + beam_results=list(beam_results.values()), present=present, batch_indices=batch_indices, beam_params_change_batch_id=beam_params_change_index * HEAPS_PER_FENGINE_PER_CHUNK, @@ -1216,11 +1236,12 @@ def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.Heap # `beam_results` holds results for each heap transmitted by a # `beam_output` for all `beam_outputs`. We can reuse its dimensions in - # the sensor verification below. + # the sensor verification below. The shape and dtype is the same for + # each beam's set of results. verify_beam_sensors( beam_outputs=beam_outputs, - beam_results_shape=beam_results.shape, - beam_dtype=beam_results.dtype, + beam_results_shape=beam_results[beam_outputs[0].name].shape, + beam_dtype=beam_results[beam_outputs[0].name].dtype, prom_diff=prom_diff, actual_sensor_updates=actual_sensor_updates, beam_request_timestamps=(first_timestamp, steady_state_timestamps[-1]), @@ -1367,7 +1388,8 @@ def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.Heap if present[ant_index] ] - request = request_factory(beam_outputs[0].name, n_ants) + beam_under_test = beam_outputs[0].name + request = request_factory(beam_under_test, n_ants) timestamp_list = self._patch_get_in_item(monkeypatch, 4, client, [request]) n_batches = heap_accumulation_threshold[0] _, data, _, _ = await self._send_data( @@ -1391,8 +1413,8 @@ def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.Heap steady_state_batch = steady_state_timestamp // timestamp_step assert 0 < steady_state_batch < n_batches # Should be all zeros after the steady state, but not before - np.testing.assert_equal(data[0, :steady_state_batch] != 0, True) - np.testing.assert_equal(data[0, steady_state_batch:], 0) + np.testing.assert_equal(data[beam_under_test][:steady_state_batch] != 0, True) + np.testing.assert_equal(data[beam_under_test][steady_state_batch:], 0) @DEFAULT_PARAMETERS async def test_bad_requests(self, client: aiokatcp.Client, n_ants: int) -> None: @@ -1452,3 +1474,88 @@ def get_stream_status(stream_name: str) -> bool: await client.request("capture-start", output.name) assert get_stream_status(output.name) is True, f"Stream {output.name} is still disabled" + + @DEFAULT_PARAMETERS + async def test_capture_stop_some_streams( + self, + monkeypatch: pytest.MonkeyPatch, + mock_recv_streams: list[spead2.InprocQueue], + mock_send_stream: list[spead2.InprocQueue], + xbengine: XBEngine, + client: aiokatcp.Client, + n_ants: int, + n_channels_per_substream: int, + frequency: int, + n_samples_between_spectra: int, + n_spectra_per_heap: int, + heap_accumulation_threshold: list[int], + corrprod_outputs: list[XOutput], + beam_outputs: list[BOutput], + ) -> None: + """Test capture-stop request on only some data streams. + + Issue a `?capture-stop` request at some point into data processing and + check the corresponding streams only have partial data transmission. + Also ensure that data is completely received for streams that did not + receive a `?capture-stop` request. + """ + n_batches = heap_accumulation_threshold[0] + all_outputs = beam_outputs + corrprod_outputs + rng = np.random.default_rng(seed=1) + # Select random subset of output data products to issue `?capture-stop` to + n_streams_to_stop = rng.integers(1, len(all_outputs)) + stop_ids = rng.choice(len(all_outputs), n_streams_to_stop, replace=False) + stopped_streams = [all_outputs[stop_id].name for stop_id in stop_ids] + + capture_stop_requests = [("capture-stop", stopped_stream) for stopped_stream in stopped_streams] + + capture_stop_chunk_id = 10 # Arbitrarily chosen + _ = self._patch_get_in_item(monkeypatch, capture_stop_chunk_id, client, requests=capture_stop_requests) + timestamp_step = n_samples_between_spectra * n_spectra_per_heap + + def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.HeapReference]: + timestamp = batch_index * timestamp_step + data = np.full((n_channels_per_substream, n_spectra_per_heap, N_POLS, COMPLEX), 10, np.int8) + return [ + spead2.send.HeapReference(gen_heap(timestamp, ant_index, frequency, data)) + for ant_index in range(n_ants) + if present[ant_index] + ] + + corrprod_results, beam_results, _, _ = await self._send_data( + mock_recv_streams, + mock_send_stream, + corrprod_outputs, + beam_outputs, + heap_factory=heap_factory, + present=np.ones((n_batches, n_ants), bool), + timestamp_step=timestamp_step, + n_channels_per_substream=n_channels_per_substream, + frequency=frequency, + n_spectra_per_heap=n_spectra_per_heap, + ) + + # NOTE: The results arrays returned are initialised as zeros, and should + # therefore still be zero for indices after the `?capture-stop` request + # was issued. That is, completely zero data is akin to no data received. + for stopped_stream in stopped_streams: + if stopped_stream in corrprod_results.keys(): + # NOTE: As per the `DEFAULT_PARAMETERS`, both corrprod_outputs have the same + # heap_accumulation_threshold, which is why we can use either one to calculate + # the accumulation index where data stopped transmitting. + np.testing.assert_equal( + corrprod_results[stopped_stream][ + capture_stop_chunk_id * HEAPS_PER_FENGINE_PER_CHUNK // heap_accumulation_threshold[0] : + ], + 0, + ) + elif stopped_stream in beam_results.keys(): + # NOTE: The `beam_results` contain data for each heap (or batch). As a result, + # we multiply the chunk ID at which `?capture-stop` was issued by + # `HEAPS_PER_FENGINE_PER_CHUNK` to get the specific heap ID at which data + # ceased transmitting for a stopped stream. + np.testing.assert_equal( + beam_results[stopped_stream][capture_stop_chunk_id * HEAPS_PER_FENGINE_PER_CHUNK :], 0 + ) + + # TODO: Check all other data is nonzero for all heaps From deec5dbd3bfed1b2d2dd6d801beb90e7e2450850 Mon Sep 17 00:00:00 2001 From: Amish Date: Fri, 8 Nov 2024 12:26:26 +0200 Subject: [PATCH 2/4] Complete test_capture_stop_some_streams And refine the test itself. Also fix verify_beam_sensors after the previous _send_data update. Contributes to: NGC-1155. --- test/xbgpu/test_engine.py | 45 +++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/test/xbgpu/test_engine.py b/test/xbgpu/test_engine.py index ce440a7e..5cf8955b 100644 --- a/test/xbgpu/test_engine.py +++ b/test/xbgpu/test_engine.py @@ -17,7 +17,6 @@ """Unit tests for XBEngine module.""" import asyncio -import logging from collections import Counter from collections.abc import AsyncGenerator, Callable, Iterable from itertools import chain @@ -49,9 +48,6 @@ from . import test_parameters from .test_recv import gen_heap -logger = logging.getLogger(__name__) -logging.basicConfig() - pytestmark = [pytest.mark.device_filter.with_args(device_filter)] get_baseline_index = njit(Correlation.get_baseline_index) @@ -539,8 +535,8 @@ def verify_beam_sensors( Output beam configurations parsed into BOutput objects. beam_results_shape The shape of the verified beam data for all beams with shape - (len(beam_outputs), n_beam_heaps_sent, n_channels_per_substream, - n_samples_between_spectra, COMPLEX). + (n_beam_heaps_sent, n_channels_per_substream, n_samples_between_spectra, + COMPLEX). beam_dtype The numpy data type of the beam data, used to calculate the number of bytes in each heap. @@ -563,8 +559,8 @@ def verify_beam_sensors( necessary because dithering is not modelled on the host. """ # Get the number of total heaps transmitted by each beam output - n_beam_heaps_sent = beam_results_shape[1] - heap_shape = beam_results_shape[2:] + n_beam_heaps_sent = beam_results_shape[0] + heap_shape = beam_results_shape[1:] heap_bytes = np.prod(heap_shape) * beam_dtype.itemsize # We get rid of the final dimension in the beam data as we need the total # number of (COMPLEX) samples. @@ -828,10 +824,8 @@ async def _send_data( out_config = spead2.recv.StreamConfig(max_heaps=100) out_tp = spead2.ThreadPool() - output_name = None for i, corrprod_output in enumerate(corrprod_outputs): try: - output_name = corrprod_output.name stream = spead2.recv.asyncio.Stream(out_tp, out_config) stream.add_inproc_reader(mock_send_stream[i]) # It is expected that the first packet will be a descriptor. @@ -874,7 +868,6 @@ async def _send_data( corrprod_results[corrprod_output.name][j] = ig_recv["xeng_raw"].value except spead2.Stopped: - logger.info(f"Ringbuffer has stopped for {output_name}") pass beam_results = { @@ -887,7 +880,6 @@ async def _send_data( for i, beam_output in enumerate(beam_outputs): try: - output_name = beam_output.name stream = spead2.recv.asyncio.Stream(out_tp, out_config) stream.add_inproc_reader(mock_send_stream[i + len(corrprod_outputs)]) # It is expected that the first packet will be a descriptor. @@ -910,7 +902,6 @@ async def _send_data( assert ig_recv["beam_ants"].value == np.sum(present[index]) beam_results[beam_output.name][j, ...] = ig_recv["bf_raw"].value except spead2.Stopped: - logger.info(f"Ringbuffer has stopped for {output_name}") pass return corrprod_results, beam_results, acc_indices, batch_indices @@ -1500,14 +1491,16 @@ async def test_capture_stop_some_streams( receive a `?capture-stop` request. """ n_batches = heap_accumulation_threshold[0] - all_outputs = beam_outputs + corrprod_outputs + # Naive list concatention using '+' not used below as mypy was not happy + # lists of two different types could be added together. + all_stream_names = list(map(lambda output: output.name, [*corrprod_outputs, *beam_outputs])) rng = np.random.default_rng(seed=1) # Select random subset of output data products to issue `?capture-stop` to - n_streams_to_stop = rng.integers(1, len(all_outputs)) - stop_ids = rng.choice(len(all_outputs), n_streams_to_stop, replace=False) - stopped_streams = [all_outputs[stop_id].name for stop_id in stop_ids] + n_streams_to_stop = rng.integers(1, len(all_stream_names)) + stop_ids = rng.choice(len(all_stream_names), n_streams_to_stop, replace=False) + stopped_stream_names = [all_stream_names[stop_id] for stop_id in stop_ids] - capture_stop_requests = [("capture-stop", stopped_stream) for stopped_stream in stopped_streams] + capture_stop_requests = [("capture-stop", stopped_stream) for stopped_stream in stopped_stream_names] capture_stop_chunk_id = 10 # Arbitrarily chosen _ = self._patch_get_in_item(monkeypatch, capture_stop_chunk_id, client, requests=capture_stop_requests) @@ -1538,7 +1531,7 @@ def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.Heap # NOTE: The results arrays returned are initialised as zeros, and should # therefore still be zero for indices after the `?capture-stop` request # was issued. That is, completely zero data is akin to no data received. - for stopped_stream in stopped_streams: + for stopped_stream in stopped_stream_names: if stopped_stream in corrprod_results.keys(): # NOTE: As per the `DEFAULT_PARAMETERS`, both corrprod_outputs have the same # heap_accumulation_threshold, which is why we can use either one to calculate @@ -1557,5 +1550,15 @@ def heap_factory(batch_index: int, present: np.ndarray) -> list[spead2.send.Heap np.testing.assert_equal( beam_results[stopped_stream][capture_stop_chunk_id * HEAPS_PER_FENGINE_PER_CHUNK :], 0 ) - - # TODO: Check all other data is nonzero for all heaps + # Remove this stopped-stream from the master list of data streams + all_stream_names.remove(stopped_stream) + + for captured_stream in all_stream_names: + if captured_stream in corrprod_results.keys(): + # NOTE: The `heap_factory` only sends real data. The results buffer has + # shape (n_accumulations, n_channels_per_substream, n_baselines, COMPLEX). + # As a result, the final dimension only has data populated for the + # 'real' (first) index. + np.testing.assert_equal(corrprod_results[captured_stream][..., 0] != 0, True) + elif captured_stream in beam_results.keys(): + np.testing.assert_equal(beam_results[captured_stream] != 0, True) From 2ac82e02ebe71f4c4441db9badacf3456af9d780 Mon Sep 17 00:00:00 2001 From: Amish Date: Fri, 8 Nov 2024 12:30:48 +0200 Subject: [PATCH 3/4] Remove unnecessary xbengine fixture from unit test Turns out it doesn't need to be around at all for the newest xbgpu unit test. Contributes to: NGC-1155. --- test/xbgpu/test_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/xbgpu/test_engine.py b/test/xbgpu/test_engine.py index 5cf8955b..47610a66 100644 --- a/test/xbgpu/test_engine.py +++ b/test/xbgpu/test_engine.py @@ -1472,7 +1472,6 @@ async def test_capture_stop_some_streams( monkeypatch: pytest.MonkeyPatch, mock_recv_streams: list[spead2.InprocQueue], mock_send_stream: list[spead2.InprocQueue], - xbengine: XBEngine, client: aiokatcp.Client, n_ants: int, n_channels_per_substream: int, From 45b1d91b43af50d1057e8d92e78cf77d1c4326eb Mon Sep 17 00:00:00 2001 From: Amish Date: Fri, 8 Nov 2024 12:37:32 +0200 Subject: [PATCH 4/4] Use chain to combine lists in unit test I imported it to use in the end-to-end test, figured I'd use it in the new test as well. Contributes to: NGC-1155. --- test/xbgpu/test_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/xbgpu/test_engine.py b/test/xbgpu/test_engine.py index 47610a66..0a5a91cb 100644 --- a/test/xbgpu/test_engine.py +++ b/test/xbgpu/test_engine.py @@ -1492,7 +1492,7 @@ async def test_capture_stop_some_streams( n_batches = heap_accumulation_threshold[0] # Naive list concatention using '+' not used below as mypy was not happy # lists of two different types could be added together. - all_stream_names = list(map(lambda output: output.name, [*corrprod_outputs, *beam_outputs])) + all_stream_names = list(map(lambda output: output.name, chain(corrprod_outputs, beam_outputs))) rng = np.random.default_rng(seed=1) # Select random subset of output data products to issue `?capture-stop` to n_streams_to_stop = rng.integers(1, len(all_stream_names))