diff --git a/src/firefly/controller.py b/src/firefly/controller.py index b088f376..5e9b6048 100644 --- a/src/firefly/controller.py +++ b/src/firefly/controller.py @@ -334,7 +334,7 @@ async def finalize_new_window(self, action): await action.window.update_devices(self.registry) @asyncSlot(QAction) - async def finalize_run_browser_window(self, action): + async def finalize_run_browser_window(self, action: QAction): """Connect up run browser signals and load initial data.""" display = action.display self.run_updated.connect(display.update_running_scan) @@ -346,13 +346,13 @@ async def finalize_run_browser_window(self, action): tiled_client=client, catalog_name=config["default_catalog"] ) - def finalize_status_window(self, action): + def finalize_status_window(self, action: QAction): """Connect up signals that are specific to the voltmeters window.""" display = action.display display.ui.bss_modify_button.clicked.connect(self.actions.bss.trigger) # display.details_window_requested.connect - def finalize_voltmeter_window(self, action): + def finalize_voltmeter_window(self, action: QAction): """Connect up signals that are specific to the voltmeters window.""" def launch_ion_chamber_window(ic_name): diff --git a/src/haven/catalog.py b/src/haven/catalog.py index 98138381..c11f3e64 100644 --- a/src/haven/catalog.py +++ b/src/haven/catalog.py @@ -1,16 +1,15 @@ import asyncio import functools import logging -import os -import sqlite3 -import threading import warnings +from collections.abc import Sequence from concurrent.futures import ThreadPoolExecutor from functools import partial -from typing import Sequence +from pathlib import Path import numpy as np -from tiled.client import from_uri +from tiled.client import from_profile +from tiled.client.base import BaseClient from tiled.client.cache import Cache from ._iconfig import load_config @@ -78,84 +77,47 @@ def wrapper(obj, *args, **kwargs): return wrapper -class ThreadSafeCache(Cache): - """Equivalent to the regular cache, but thread-safe. - - Ensures that sqlite3 is built with concurrency features, and - ensures that no two write operations happen concurrently. - - """ - - def __init__( - self, - *args, - **kwargs, - ): - super().__init__(*args, **kwargs) - self._lock = threading.Lock() - - def write_safe(self): - """ - Check that it is safe to write. - - SQLite is not threadsafe for concurrent _writes_. - """ - is_main_thread = threading.current_thread().ident == self._owner_thread - sqlite_is_safe = sqlite3.threadsafety == 1 - return is_main_thread or sqlite_is_safe - - # Wrap the accessor methods so they wait for the lock - clear = with_thread_lock(Cache.clear) - set = with_thread_lock(Cache.set) - get = with_thread_lock(Cache.get) - delete = with_thread_lock(Cache.delete) - - -DEFAULT_NODE = object() +class DEFAULT: + pass def tiled_client( - catalog: str = DEFAULT_NODE, - uri: str = None, - cache_filepath=None, - structure_clients="numpy", -): - """Load a tiled client for retrieving data from databses. + catalog: str | type[DEFAULT] | None = DEFAULT, + profile: str = "haven", + cache_filepath: Path | type[DEFAULT] | None = DEFAULT, + structure_clients: str = "numpy", +) -> BaseClient: + """Load a Tiled client with some default options. Parameters ========== catalog - The node within the catalog to return, by default this will be - read from the config file. If ``None``, the root container will - be return containing all catalogs. - uri - The location of the tiled server, e.g. "http://localhost:8000". + If not None, load a specific catalog within the client. By + default, the iconfig.toml file will be consulted for the value + of ``tiled.default_catalog``. + profile + Use a specific Tiled profile. If not provided, the default Tiled + profile will be used. cache_filepath - Where to keep a local cache of tiled nodes. - structure_clients - "numpy" for immediate retrieval of data, "dask" for just-in-time - retrieval. + The path on which to store a cache of downloaded data. If + omitted, the iconfig.toml file will be consulted for the value + of ``tiled.cache_filepath``. """ - tiled_config = load_config().get("tiled", {}) - # Create a cache for saving local copies - if cache_filepath is None: - cache_filepath = tiled_config.get("cache_filepath", "") - if os.access(cache_filepath, os.W_OK): - cache = ThreadSafeCache(filepath=cache_filepath) - else: - warnings.warn(f"Cache file is not writable: {cache_filepath}") - cache = None + # Get default values from the database + tiled_config = load_config()["tiled"] + if cache_filepath is DEFAULT: + cache_filepath = tiled_config.get("cache_filepath") + if catalog is DEFAULT: + catalog = tiled_config.get("default_catalog") # Create the client - if uri is None: - uri = tiled_config["uri"] - api_key = tiled_config.get("api_key") - client_ = from_uri(uri, structure_clients, api_key=api_key) - if catalog is DEFAULT_NODE: - client_ = client_[tiled_config["default_catalog"]] - elif catalog is not None: - client_ = client_[catalog] - return client_ + kw = {} + if cache_filepath is not None: + kw["cache"] = Cache(cache_filepath) + client = from_profile(profile, structure_clients=structure_clients, **kw) + if catalog is not None: + client = client[catalog] + return client class CatalogScan: @@ -175,7 +137,7 @@ def stream_names(self): return list(self.container.keys()) @run_in_executor - def _read_data(self, signals: Sequence | None, dataset: str): + def _read_data(self, signals: Sequence[str] | None, dataset: str): data = self.container[dataset] if signals is None: return data.read() diff --git a/src/haven/iconfig_testing.toml b/src/haven/iconfig_testing.toml index 186febe9..d141c2e4 100644 --- a/src/haven/iconfig_testing.toml +++ b/src/haven/iconfig_testing.toml @@ -34,10 +34,8 @@ servers = ["fedorov.xray.aps.anl.gov:9092"] topic = "bluesky.documents.haven-dev" [tiled] -uri = "http://localhost:8337/" -default_catalog = "haven-dev" -cache_filepath = "/tmp/tiled_cache/http_response_cache.db" -api_key = "" +default_catalog = "testing" +cache_filepath = "/tmp/tiled/http_response_cache.db" [database.databroker] catalog = "bluesky" diff --git a/src/haven/ipython_startup.ipy b/src/haven/ipython_startup.ipy index c6f339f9..5bb9cd15 100644 --- a/src/haven/ipython_startup.ipy +++ b/src/haven/ipython_startup.ipy @@ -3,7 +3,6 @@ import logging import time import databroker # noqa: F401 -import matplotlib.pyplot as plt # noqa: F401 from bluesky import plan_stubs as bps # noqa: F401 from bluesky.plan_stubs import mv, mvr, rd # noqa: F401 from bluesky import plans as bp # noqa: F401 @@ -14,7 +13,6 @@ from bluesky.run_engine import RunEngine, call_in_bluesky_event_loop # noqa: F4 from bluesky.simulators import summarize_plan # noqa: F401 from ophyd_async.core import NotConnected from ophydregistry import ComponentNotFound -import matplotlib.pyplot as plt from rich import print from rich.align import Align from rich.console import Console @@ -28,6 +26,9 @@ logging.basicConfig(level=logging.WARNING) log = logging.getLogger(__name__) +# Load the Tiled catalog for reading data back outline +catalog = haven.tiled_client() + # Create a run engine RE = haven.run_engine( connect_kafka=True, @@ -41,9 +42,6 @@ RE = haven.run_engine( # devices get stuck from ophyd_async.core import DeviceCollector # noqa: F401 -# Allow best effort callback to update properly -plt.ion() - # Prepare the haven instrument config = haven.load_config() t0 = time.monotonic() diff --git a/src/queueserver/tiled_consumer.py b/src/queueserver/tiled_consumer.py index 94a9e163..ee4e0dbc 100755 --- a/src/queueserver/tiled_consumer.py +++ b/src/queueserver/tiled_consumer.py @@ -1,3 +1,5 @@ +#!/bin/env python + import logging import sys from typing import Mapping, Sequence @@ -5,11 +7,9 @@ import msgpack from bluesky.callbacks.tiled_writer import TiledWriter from bluesky_kafka import BlueskyConsumer -from tiled.client import from_uri +from tiled.client import from_profile from tiled.client.base import BaseClient -import haven - log = logging.getLogger(__name__) @@ -96,18 +96,15 @@ def process_document(self, topic: str, name: str, doc: Mapping) -> bool: def main(): """Launch the tiled consumer.""" logging.basicConfig(level=logging.INFO) - config = haven.load_config() - bootstrap_servers = ["localhost:9092"] + bootstrap_servers = ["fedorov.xray.aps.anl.gov:9092"] topic_catalog_map = { - "25idc.bluesky.documents": "haven", - "25idd.bluesky.documents": "haven", - "25idc-dev.bluesky.documents": "haven-dev", - "25idd-dev.bluesky.documents": "haven-dev", + "25idc.bluesky.documents": "scans", + "25idd.bluesky.documents": "scans", + "25idc-dev.bluesky.documents": "testing", + "25idd-dev.bluesky.documents": "testing", } # Create a tiled writer that will write documents to tiled - tiled_uri = config["tiled"]["uri"] - tiled_api_key = config["tiled"]["api_key"] - client = from_uri(tiled_uri, api_key=tiled_api_key, include_data_sources=True) + client = from_profile("haven", include_data_sources=True) # Create a Tiled consumer that will listen for new documents. consumer = TiledConsumer(