Skip to content

Commit

Permalink
Use MonitorQueue in soft signal backend tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jsouter committed Jan 8, 2025
1 parent 9cf5e5a commit 23e9991
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 53 deletions.
6 changes: 4 additions & 2 deletions src/ophyd_async/testing/_assert.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,10 @@ def __eq__(self, value):


class MonitorQueue(AbstractContextManager):
def __init__(self, signal: SignalR):
def __init__(self, signal: SignalR, monotonic=False):
self.signal = signal
self.updates: asyncio.Queue[dict[str, Reading]] = asyncio.Queue()
self._monotonic_timestamps = monotonic

async def assert_updates(self, expected_value):
# Get an update, value and reading
Expand All @@ -159,10 +160,11 @@ async def assert_updates(self, expected_value):
# Check they match what we expected
assert value == expected_value
assert type(value) is expected_type
timestamp = time.monotonic() if self._monotonic_timestamps else time.time()
expected_reading = {
self.signal.name: {
"value": expected_value,
"timestamp": pytest.approx(time.time(), rel=0.1),
"timestamp": pytest.approx(timestamp, rel=0.1),
"alarm_severity": 0,
}
}
Expand Down
70 changes: 19 additions & 51 deletions tests/core/test_soft_signal_backend.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import asyncio
import os
import time
from collections.abc import Callable, Sequence
from typing import Any

import numpy as np
import pytest
from bluesky.protocols import Reading

from ophyd_async.core import (
Array1D,
SignalBackend,
SoftSignalBackend,
StrictEnum,
T,
soft_signal_rw,
)
from ophyd_async.testing import MonitorQueue


class MyEnum(StrictEnum):
Expand Down Expand Up @@ -44,30 +41,6 @@ def waveform_d(value):
return {"dtype": "array", "shape": [len(value)]}


class MonitorQueue:
def __init__(self, backend: SignalBackend):
self.backend = backend
self.updates: asyncio.Queue[Reading] = asyncio.Queue()
backend.set_callback(self.updates.put_nowait)

async def assert_updates(self, expected_value):
expected_reading = {
"value": expected_value,
"timestamp": pytest.approx(time.monotonic(), rel=0.1),
"alarm_severity": 0,
}
reading = await self.updates.get()

backend_value = await self.backend.get_value()
backend_reading = await self.backend.get_reading()

assert reading["value"] == expected_value == backend_value
assert reading == expected_reading == backend_reading

def close(self):
self.backend.set_callback(None)


# Can be removed once numpy >=2 is pinned.
default_int_type = (
"<i4" if os.name == "nt" and np.version.version.startswith("1.") else "<i8"
Expand All @@ -81,16 +54,16 @@ def close(self):
(float, 0.0, 43.5, number_d, "<f8"),
(str, "", "goodbye", string_d, "|S40"),
(MyEnum, MyEnum.A, MyEnum.C, enum_d, "|S40"),
(Array1D[np.int8], [], [-8, 3, 44], waveform_d, "|i1"),
(Array1D[np.uint8], [], [218], waveform_d, "|u1"),
(Array1D[np.int16], [], [-855], waveform_d, "<i2"),
(Array1D[np.uint16], [], [5666], waveform_d, "<u2"),
(Array1D[np.int32], [], [-2], waveform_d, "<i4"),
(Array1D[np.uint32], [], [1022233], waveform_d, "<u4"),
(Array1D[np.int64], [], [-3], waveform_d, "<i8"),
(Array1D[np.uint64], [], [995444], waveform_d, "<u8"),
(Array1D[np.float32], [], [1.0], waveform_d, "<f4"),
(Array1D[np.float64], [], [0.2], waveform_d, "<f8"),
(Array1D[np.int8], np.array([]), np.array([-8, 3, 44]), waveform_d, "|i1"),
(Array1D[np.uint8], np.array([]), np.array([218]), waveform_d, "|u1"),
(Array1D[np.int16], np.array([]), np.array([-855]), waveform_d, "<i2"),
(Array1D[np.uint16], np.array([]), np.array([5666]), waveform_d, "<u2"),
(Array1D[np.int32], np.array([]), np.array([-2]), waveform_d, "<i4"),
(Array1D[np.uint32], np.array([]), np.array([1022233]), waveform_d, "<u4"),
(Array1D[np.int64], np.array([]), np.array([-3]), waveform_d, "<i8"),
(Array1D[np.uint64], np.array([]), np.array([995444]), waveform_d, "<u8"),
(Array1D[np.float32], np.array([]), np.array([1.0]), waveform_d, "<f4"),
(Array1D[np.float64], np.array([]), np.array([0.2]), waveform_d, "<f8"),
(Sequence[str], [], ["nine", "ten"], waveform_d, "|S40"),
],
)
Expand All @@ -101,26 +74,21 @@ async def test_soft_signal_backend_get_put_monitor(
descriptor: Callable[[Any], dict],
dtype_numpy: str,
):
backend = SoftSignalBackend(datatype=datatype)

await backend.connect(1)
q = MonitorQueue(backend)
try:
signal = soft_signal_rw(datatype, initial_value)
await signal.connect()
backend = signal._connector.backend
with MonitorQueue(signal, monotonic=True) as q:
# Check descriptor
source = "soft://test"
# Add expected dtype_numpy to descriptor
assert dict(
source=source, **descriptor(initial_value), dtype_numpy=dtype_numpy
) == await backend.get_datakey(source)
# Check initial value
await q.assert_updates(
pytest.approx(initial_value) if initial_value != "" else initial_value
)
await q.assert_updates(initial_value)

# Put to new value and check that
await backend.put(put_value, True)
await q.assert_updates(pytest.approx(put_value))
finally:
q.close()
await backend.put(put_value, wait=True)
await q.assert_updates(put_value)


async def test_soft_signal_backend_enum_value_equivalence():
Expand Down

0 comments on commit 23e9991

Please sign in to comment.