Skip to content

Commit

Permalink
Converted the tiled_client factory to use Tiled profiles. (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
canismarko authored Feb 7, 2025
1 parent bb1ed1b commit ccbf125
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 97 deletions.
6 changes: 3 additions & 3 deletions src/firefly/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ async def finalize_new_window(self, action):
await action.window.update_devices(self.registry)

@asyncSlot(QAction)
async def finalize_run_browser_window(self, action):
async def finalize_run_browser_window(self, action: QAction):
"""Connect up run browser signals and load initial data."""
display = action.display
self.run_updated.connect(display.update_running_scan)
Expand All @@ -346,13 +346,13 @@ async def finalize_run_browser_window(self, action):
tiled_client=client, catalog_name=config["default_catalog"]
)

def finalize_status_window(self, action):
def finalize_status_window(self, action: QAction):
"""Connect up signals that are specific to the voltmeters window."""
display = action.display
display.ui.bss_modify_button.clicked.connect(self.actions.bss.trigger)
# display.details_window_requested.connect

def finalize_voltmeter_window(self, action):
def finalize_voltmeter_window(self, action: QAction):
"""Connect up signals that are specific to the voltmeters window."""

def launch_ion_chamber_window(ic_name):
Expand Down
108 changes: 35 additions & 73 deletions src/haven/catalog.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import asyncio
import functools
import logging
import os
import sqlite3
import threading
import warnings
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Sequence
from pathlib import Path

import numpy as np
from tiled.client import from_uri
from tiled.client import from_profile
from tiled.client.base import BaseClient
from tiled.client.cache import Cache

from ._iconfig import load_config
Expand Down Expand Up @@ -78,84 +77,47 @@ def wrapper(obj, *args, **kwargs):
return wrapper


class ThreadSafeCache(Cache):
"""Equivalent to the regular cache, but thread-safe.
Ensures that sqlite3 is built with concurrency features, and
ensures that no two write operations happen concurrently.
"""

def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self._lock = threading.Lock()

def write_safe(self):
"""
Check that it is safe to write.
SQLite is not threadsafe for concurrent _writes_.
"""
is_main_thread = threading.current_thread().ident == self._owner_thread
sqlite_is_safe = sqlite3.threadsafety == 1
return is_main_thread or sqlite_is_safe

# Wrap the accessor methods so they wait for the lock
clear = with_thread_lock(Cache.clear)
set = with_thread_lock(Cache.set)
get = with_thread_lock(Cache.get)
delete = with_thread_lock(Cache.delete)


DEFAULT_NODE = object()
class DEFAULT:
pass


def tiled_client(
catalog: str = DEFAULT_NODE,
uri: str = None,
cache_filepath=None,
structure_clients="numpy",
):
"""Load a tiled client for retrieving data from databses.
catalog: str | type[DEFAULT] | None = DEFAULT,
profile: str = "haven",
cache_filepath: Path | type[DEFAULT] | None = DEFAULT,
structure_clients: str = "numpy",
) -> BaseClient:
"""Load a Tiled client with some default options.
Parameters
==========
catalog
The node within the catalog to return, by default this will be
read from the config file. If ``None``, the root container will
be return containing all catalogs.
uri
The location of the tiled server, e.g. "http://localhost:8000".
If not None, load a specific catalog within the client. By
default, the iconfig.toml file will be consulted for the value
of ``tiled.default_catalog``.
profile
Use a specific Tiled profile. If not provided, the default Tiled
profile will be used.
cache_filepath
Where to keep a local cache of tiled nodes.
structure_clients
"numpy" for immediate retrieval of data, "dask" for just-in-time
retrieval.
The path on which to store a cache of downloaded data. If
omitted, the iconfig.toml file will be consulted for the value
of ``tiled.cache_filepath``.
"""
tiled_config = load_config().get("tiled", {})
# Create a cache for saving local copies
if cache_filepath is None:
cache_filepath = tiled_config.get("cache_filepath", "")
if os.access(cache_filepath, os.W_OK):
cache = ThreadSafeCache(filepath=cache_filepath)
else:
warnings.warn(f"Cache file is not writable: {cache_filepath}")
cache = None
# Get default values from the database
tiled_config = load_config()["tiled"]
if cache_filepath is DEFAULT:
cache_filepath = tiled_config.get("cache_filepath")
if catalog is DEFAULT:
catalog = tiled_config.get("default_catalog")
# Create the client
if uri is None:
uri = tiled_config["uri"]
api_key = tiled_config.get("api_key")
client_ = from_uri(uri, structure_clients, api_key=api_key)
if catalog is DEFAULT_NODE:
client_ = client_[tiled_config["default_catalog"]]
elif catalog is not None:
client_ = client_[catalog]
return client_
kw = {}
if cache_filepath is not None:
kw["cache"] = Cache(cache_filepath)
client = from_profile(profile, structure_clients=structure_clients, **kw)
if catalog is not None:
client = client[catalog]
return client


class CatalogScan:
Expand All @@ -175,7 +137,7 @@ def stream_names(self):
return list(self.container.keys())

@run_in_executor
def _read_data(self, signals: Sequence | None, dataset: str):
def _read_data(self, signals: Sequence[str] | None, dataset: str):
data = self.container[dataset]
if signals is None:
return data.read()
Expand Down
6 changes: 2 additions & 4 deletions src/haven/iconfig_testing.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ servers = ["fedorov.xray.aps.anl.gov:9092"]
topic = "bluesky.documents.haven-dev"

[tiled]
uri = "http://localhost:8337/"
default_catalog = "haven-dev"
cache_filepath = "/tmp/tiled_cache/http_response_cache.db"
api_key = "<tiled-secret-api-key-for-writing>"
default_catalog = "testing"
cache_filepath = "/tmp/tiled/http_response_cache.db"

[database.databroker]
catalog = "bluesky"
Expand Down
8 changes: 3 additions & 5 deletions src/haven/ipython_startup.ipy
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import logging
import time

import databroker # noqa: F401
import matplotlib.pyplot as plt # noqa: F401
from bluesky import plan_stubs as bps # noqa: F401
from bluesky.plan_stubs import mv, mvr, rd # noqa: F401
from bluesky import plans as bp # noqa: F401
Expand All @@ -14,7 +13,6 @@ from bluesky.run_engine import RunEngine, call_in_bluesky_event_loop # noqa: F4
from bluesky.simulators import summarize_plan # noqa: F401
from ophyd_async.core import NotConnected
from ophydregistry import ComponentNotFound
import matplotlib.pyplot as plt
from rich import print
from rich.align import Align
from rich.console import Console
Expand All @@ -28,6 +26,9 @@ logging.basicConfig(level=logging.WARNING)

log = logging.getLogger(__name__)

# Load the Tiled catalog for reading data back outline
catalog = haven.tiled_client()

# Create a run engine
RE = haven.run_engine(
connect_kafka=True,
Expand All @@ -41,9 +42,6 @@ RE = haven.run_engine(
# devices get stuck
from ophyd_async.core import DeviceCollector # noqa: F401

# Allow best effort callback to update properly
plt.ion()

# Prepare the haven instrument
config = haven.load_config()
t0 = time.monotonic()
Expand Down
21 changes: 9 additions & 12 deletions src/queueserver/tiled_consumer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#!/bin/env python

import logging
import sys
from typing import Mapping, Sequence

import msgpack
from bluesky.callbacks.tiled_writer import TiledWriter
from bluesky_kafka import BlueskyConsumer
from tiled.client import from_uri
from tiled.client import from_profile
from tiled.client.base import BaseClient

import haven

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -96,18 +96,15 @@ def process_document(self, topic: str, name: str, doc: Mapping) -> bool:
def main():
"""Launch the tiled consumer."""
logging.basicConfig(level=logging.INFO)
config = haven.load_config()
bootstrap_servers = ["localhost:9092"]
bootstrap_servers = ["fedorov.xray.aps.anl.gov:9092"]
topic_catalog_map = {
"25idc.bluesky.documents": "haven",
"25idd.bluesky.documents": "haven",
"25idc-dev.bluesky.documents": "haven-dev",
"25idd-dev.bluesky.documents": "haven-dev",
"25idc.bluesky.documents": "scans",
"25idd.bluesky.documents": "scans",
"25idc-dev.bluesky.documents": "testing",
"25idd-dev.bluesky.documents": "testing",
}
# Create a tiled writer that will write documents to tiled
tiled_uri = config["tiled"]["uri"]
tiled_api_key = config["tiled"]["api_key"]
client = from_uri(tiled_uri, api_key=tiled_api_key, include_data_sources=True)
client = from_profile("haven", include_data_sources=True)

# Create a Tiled consumer that will listen for new documents.
consumer = TiledConsumer(
Expand Down

0 comments on commit ccbf125

Please sign in to comment.