-
Notifications
You must be signed in to change notification settings - Fork 28
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rewrite sim and epics.demo devices to do the same
- Loading branch information
Showing
41 changed files
with
934 additions
and
1,052 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,16 @@ | ||
"""Demo EPICS Devices for the tutorial""" | ||
|
||
import atexit | ||
import random | ||
import string | ||
import subprocess | ||
import sys | ||
from pathlib import Path | ||
|
||
from ._mover import Mover, SampleStage | ||
from ._sensor import EnergyMode, Sensor, SensorGroup | ||
from ._ioc import start_ioc_subprocess | ||
from ._motor import DemoMotor | ||
from ._point_detector import DemoPointDetector | ||
from ._point_detector_channel import DemoPointDetectorChannel, EnergyMode | ||
from ._stage import DemoStage | ||
|
||
__all__ = [ | ||
"Mover", | ||
"SampleStage", | ||
"DemoMotor", | ||
"DemoStage", | ||
"EnergyMode", | ||
"Sensor", | ||
"SensorGroup", | ||
"DemoPointDetectorChannel", | ||
"DemoPointDetector", | ||
"start_ioc_subprocess", | ||
] | ||
|
||
|
||
def start_ioc_subprocess() -> str: | ||
"""Start an IOC subprocess with EPICS database for sample stage and sensor | ||
with the same pv prefix | ||
""" | ||
|
||
pv_prefix = "".join(random.choice(string.ascii_uppercase) for _ in range(12)) + ":" | ||
here = Path(__file__).absolute().parent | ||
args = [sys.executable, "-m", "epicscorelibs.ioc"] | ||
|
||
# Create standalone sensor | ||
args += ["-m", f"P={pv_prefix}"] | ||
args += ["-d", str(here / "sensor.db")] | ||
|
||
# Create sensor group | ||
for suffix in ["1", "2", "3"]: | ||
args += ["-m", f"P={pv_prefix}{suffix}:"] | ||
args += ["-d", str(here / "sensor.db")] | ||
|
||
# Create X and Y motors | ||
for suffix in ["X", "Y"]: | ||
args += ["-m", f"P={pv_prefix}{suffix}:"] | ||
args += ["-d", str(here / "mover.db")] | ||
|
||
# Start IOC | ||
process = subprocess.Popen( | ||
args, | ||
stdin=subprocess.PIPE, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.STDOUT, | ||
universal_newlines=True, | ||
) | ||
atexit.register(process.communicate, "exit") | ||
return pv_prefix |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
import asyncio | ||
from typing import Annotated as A | ||
|
||
import numpy as np | ||
from bluesky.protocols import Movable, Stoppable | ||
|
||
from ophyd_async.core import ( | ||
CALCULATE_TIMEOUT, | ||
DEFAULT_TIMEOUT, | ||
CalculatableTimeout, | ||
SignalR, | ||
SignalRW, | ||
SignalX, | ||
StandardReadable, | ||
WatchableAsyncStatus, | ||
WatcherUpdate, | ||
observe_value, | ||
) | ||
from ophyd_async.core import StandardReadableFormat as Format | ||
from ophyd_async.epics.core import EpicsDevice, PvSuffix | ||
|
||
|
||
class DemoMotor(EpicsDevice, StandardReadable, Movable, Stoppable): | ||
"""A demo movable that moves based on velocity""" | ||
|
||
# Whether set() should complete successfully or not | ||
_set_success = True | ||
# Define some signals | ||
readback: A[SignalR[float], PvSuffix("Readback"), Format.HINTED_SIGNAL] | ||
velocity: A[SignalRW[float], PvSuffix("Velocity"), Format.CONFIG_SIGNAL] | ||
units: A[SignalR[str], PvSuffix("Readback.EGU"), Format.CONFIG_SIGNAL] | ||
setpoint: A[SignalRW[float], PvSuffix("Setpoint")] | ||
precision: A[SignalR[int], PvSuffix("Readback.PREC")] | ||
# If a signal name clashes with a bluesky verb add _ to the attribute name | ||
stop_: A[SignalX, PvSuffix("Stop.PROC")] | ||
|
||
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None: | ||
super().set_name(name, child_name_separator=child_name_separator) | ||
# Readback should be named the same as its parent in read() | ||
self.readback.set_name(name) | ||
|
||
@WatchableAsyncStatus.wrap | ||
async def set(self, value: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT): | ||
new_position = value | ||
self._set_success = True | ||
old_position, units, precision, velocity = await asyncio.gather( | ||
self.setpoint.get_value(), | ||
self.units.get_value(), | ||
self.precision.get_value(), | ||
self.velocity.get_value(), | ||
) | ||
if timeout == CALCULATE_TIMEOUT: | ||
timeout = abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT | ||
# Wait for the value to set, but don't wait for put completion callback | ||
await self.setpoint.set(new_position, wait=False) | ||
async for current_position in observe_value( | ||
self.readback, done_timeout=timeout | ||
): | ||
yield WatcherUpdate( | ||
current=current_position, | ||
initial=old_position, | ||
target=new_position, | ||
name=self.name, | ||
unit=units, | ||
precision=precision, | ||
) | ||
if np.isclose(current_position, new_position): | ||
break | ||
if not self._set_success: | ||
raise RuntimeError("Motor was stopped") | ||
|
||
async def stop(self, success=True): | ||
self._set_success = success | ||
status = self.stop_.trigger() | ||
await status |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.