diff --git a/src/ophyd_async/epics/adandor/__init__.py b/src/ophyd_async/epics/adandor/__init__.py new file mode 100644 index 0000000000..8b945eafd6 --- /dev/null +++ b/src/ophyd_async/epics/adandor/__init__.py @@ -0,0 +1,9 @@ +from ._andor import Andor2Detector +from ._andor_controller import Andor2Controller +from ._andor_io import Andor2DriverIO + +__all__ = [ + "Andor2Detector", + "Andor2Controller", + "Andor2DriverIO", +] diff --git a/src/ophyd_async/epics/adandor/_andor.py b/src/ophyd_async/epics/adandor/_andor.py new file mode 100644 index 0000000000..21d89e312f --- /dev/null +++ b/src/ophyd_async/epics/adandor/_andor.py @@ -0,0 +1,45 @@ +from collections.abc import Sequence + +from ophyd_async.core import PathProvider +from ophyd_async.core._signal import SignalR +from ophyd_async.epics import adcore + +from ._andor_controller import Andor2Controller +from ._andor_io import Andor2DriverIO + + +class Andor2Detector(adcore.AreaDetector[Andor2Controller]): + """ + Andor 2 area detector device (CCD detector 56fps with full chip readout). + Andor model:DU897_BV. + """ + + def __init__( + self, + prefix: str, + path_provider: PathProvider, + drv_suffix="cam1:", + writer_cls: type[adcore.ADWriter] = adcore.ADHDFWriter, + fileio_suffix: str | None = None, + name: str = "", + config_sigs: Sequence[SignalR] = (), + plugins: dict[str, adcore.NDPluginBaseIO] | None = None, + ): + driver = Andor2DriverIO(prefix + drv_suffix) + controller = Andor2Controller(driver) + + writer = writer_cls.with_io( + prefix, + path_provider, + dataset_source=driver, + fileio_suffix=fileio_suffix, + plugins=plugins, + ) + + super().__init__( + controller=controller, + writer=writer, + plugins=plugins, + name=name, + config_sigs=config_sigs, + ) diff --git a/src/ophyd_async/epics/adandor/_andor_controller.py b/src/ophyd_async/epics/adandor/_andor_controller.py new file mode 100644 index 0000000000..b0b5756fe1 --- /dev/null +++ b/src/ophyd_async/epics/adandor/_andor_controller.py @@ -0,0 +1,49 @@ +import asyncio + +from ophyd_async.core import ( + DetectorTrigger, + TriggerInfo, +) +from ophyd_async.epics import adcore + +from ._andor_io import Andor2DriverIO, Andor2TriggerMode + +_MIN_DEAD_TIME = 0.1 +_MAX_NUM_IMAGE = 999_999 + + +class Andor2Controller(adcore.ADBaseController[Andor2DriverIO]): + def __init__( + self, + driver: Andor2DriverIO, + good_states: frozenset[adcore.DetectorState] = adcore.DEFAULT_GOOD_STATES, + ) -> None: + super().__init__(driver, good_states=good_states) + + def get_deadtime(self, exposure: float | None) -> float: + return _MIN_DEAD_TIME + (exposure or 0) + + async def prepare(self, trigger_info: TriggerInfo): + await self.set_exposure_time_and_acquire_period_if_supplied( + trigger_info.livetime + ) + await asyncio.gather( + self.driver.trigger_mode.set(self._get_trigger_mode(trigger_info.trigger)), + self.driver.num_images.set( + trigger_info.total_number_of_triggers or _MAX_NUM_IMAGE + ), + self.driver.image_mode.set(adcore.ImageMode.MULTIPLE), + ) + + def _get_trigger_mode(self, trigger: DetectorTrigger) -> Andor2TriggerMode: + supported_trigger_types = { + DetectorTrigger.INTERNAL: Andor2TriggerMode.INTERNAL, + DetectorTrigger.EDGE_TRIGGER: Andor2TriggerMode.EXT_TRIGGER, + } + if trigger not in supported_trigger_types: + raise ValueError( + f"{self.__class__.__name__} only supports the following trigger " + f"types: {supported_trigger_types} but was asked to " + f"use {trigger}" + ) + return supported_trigger_types[trigger] diff --git a/src/ophyd_async/epics/adandor/_andor_io.py b/src/ophyd_async/epics/adandor/_andor_io.py new file mode 100644 index 0000000000..aa32ac5e1d --- /dev/null +++ b/src/ophyd_async/epics/adandor/_andor_io.py @@ -0,0 +1,36 @@ +from ophyd_async.core import StrictEnum, SubsetEnum +from ophyd_async.epics.adcore import ADBaseIO +from ophyd_async.epics.core import ( + epics_signal_r, + epics_signal_rw, +) + + +class Andor2TriggerMode(StrictEnum): + INTERNAL = "Internal" + EXT_TRIGGER = "External" + EXT_START = "External Start" + EXT_EXPOSURE = "External Exposure" + EXT_FVP = "External FVP" + SOFTWARE = "Software" + + +class Andor2DataType(SubsetEnum): + UINT16 = "UInt16" + UINT32 = "UInt32" + FLOAT32 = "Float32" + FLOAT64 = "Float64" + + +class Andor2DriverIO(ADBaseIO): + """ + Epics pv for andor model:DU897_BV as deployed on p99 + """ + + def __init__(self, prefix: str, name: str = "") -> None: + super().__init__(prefix, name=name) + self.trigger_mode = epics_signal_rw(Andor2TriggerMode, prefix + "TriggerMode") + self.data_type = epics_signal_r(Andor2DataType, prefix + "DataType_RBV") + self.andor_accumulate_period = epics_signal_r( + float, prefix + "AndorAccumulatePeriod_RBV" + ) diff --git a/src/ophyd_async/epics/adcore/_utils.py b/src/ophyd_async/epics/adcore/_utils.py index ec56efb723..dd54b6236f 100644 --- a/src/ophyd_async/epics/adcore/_utils.py +++ b/src/ophyd_async/epics/adcore/_utils.py @@ -6,6 +6,7 @@ SignalR, SignalRW, StrictEnum, + SubsetEnum, wait_for_value, ) @@ -84,7 +85,7 @@ class FileWriteMode(StrictEnum): STREAM = "Stream" -class ImageMode(StrictEnum): +class ImageMode(SubsetEnum): SINGLE = "Single" MULTIPLE = "Multiple" CONTINUOUS = "Continuous" diff --git a/tests/epics/adandor/test_andor.py b/tests/epics/adandor/test_andor.py new file mode 100644 index 0000000000..debced3a05 --- /dev/null +++ b/tests/epics/adandor/test_andor.py @@ -0,0 +1,123 @@ +from typing import cast +from unittest.mock import AsyncMock, patch + +import pytest +from event_model import StreamDatum, StreamResource + +from ophyd_async.core import ( + DetectorTrigger, + PathProvider, + TriggerInfo, +) +from ophyd_async.epics import adandor + + +@pytest.fixture +def test_adandor(ad_standard_det_factory) -> adandor.Andor2Detector: + return ad_standard_det_factory(adandor.Andor2Detector) + + +@pytest.mark.parametrize("exposure_time", [0.0, 0.1, 1.0, 10.0, 100.0]) +async def test_deadtime_from_exposure_time( + exposure_time: float, + test_adandor: adandor.Andor2Detector, +): + assert test_adandor._controller.get_deadtime(exposure_time) == exposure_time + 0.1 + + +async def test_hints_from_hdf_writer(test_adandor: adandor.Andor2Detector): + assert test_adandor.hints == {"fields": ["test_adandor21"]} + + +async def test_can_read(test_adandor: adandor.Andor2Detector): + # Standard detector can be used as Readable + assert (await test_adandor.read()) == {} + + +async def test_decribe_describes_writer_dataset( + test_adandor: adandor.Andor2Detector, one_shot_trigger_info: TriggerInfo +): + assert await test_adandor.describe() == {} + await test_adandor.stage() + await test_adandor.prepare(one_shot_trigger_info) + assert await test_adandor.describe() == { + "test_adandor21": { + "source": "mock+ca://ANDOR21:HDF1:FullFileName_RBV", + "shape": [10, 10], + "dtype": "array", + "dtype_numpy": "