Skip to content

Commit

Permalink
Merge pull request #40 from SciCatProject/system-helper
Browse files Browse the repository at this point in the history
Refactoring background ingestor.
  • Loading branch information
YooSunYoung authored Jul 5, 2024
2 parents 963eb32 + 2e4c889 commit 317ec57
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 125 deletions.
39 changes: 37 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,45 @@

# Scicat Filewriter Ingest

## About

A daemon that creates a raw dataset using scicat interface whenever a new file is written by a file-writer.

## How to INSTALL
```bash
git clone https://github.com/SciCatProject/scicat-filewriter-ingest.git
cd scicat-filewriter-ingest
pip install -e . # It will allow you to use entry-points of the scripts,
# defined in ``pyproject.toml``, under ``[project.scripts]`` section.
```

## How to RUN

All scripts parse the system arguments and configuration in the same way.

### Online ingestor (Highest level interface)
You can start the ingestor daemon with certain configurations.

It will continuously process `wrdn` messages and ingest the nexus files.

```bash
scicat_ingestor --verbose -c PATH_TO_CONFIGURATION_FILE.yaml
```

See [configuration](#configuration) for how to use configuration files.

### Background ingestor (Lower level interface)
You can also run the ingestor file by file.

You need to know the path to the nexus file you want to ingest
and also the path to the ``done_writing_message_file`` as a json file.

```bash
background_ingestor \\
--verbose \\
-c PATH_TO_CONFIGURATION_FILE.yaml \\
--nexus-file PATH_TO_THE_NEXUS_FILE.nxs \\
--done-writing-message-file PATH_TO_THE_MESSAGE_FILE.json
```

## Configuration

You can use a json file to configure options.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dynamic = ["version"]

[project.scripts]
scicat_ingestor = "scicat_ingestor:main"
background_ingestor = "background_ingestor:main"

[project.entry-points."scicat_ingestor.metadata_extractor"]
max = "numpy:max"
Expand Down
41 changes: 7 additions & 34 deletions src/background-ingestor.py → src/background_ingestor.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,22 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
# import scippnexus as snx
import json
import logging
import pathlib
from collections.abc import Generator
from contextlib import contextmanager

from scicat_configuration import (
build_background_ingestor_arg_parser,
build_scicat_config,
build_scicat_background_ingester_config,
)
from scicat_logging import build_logger

# import scippnexus as snx


def quit(logger: logging.Logger, unexpected: bool = True) -> None:
"""Log the message and exit the program."""
import sys

logger.info("Exiting ingestor")
sys.exit(1 if unexpected else 0)


@contextmanager
def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]:
"""Exit the program if an exception is raised."""
try:
yield
except KeyboardInterrupt:
logger.info("Received keyboard interrupt.")
quit(logger, unexpected=False)
except Exception as e:
logger.error("An exception occurred: %s", e)
quit(logger, unexpected=True)
else:
logger.error("Loop finished unexpectedly.")
quit(logger, unexpected=True)
from system_helpers import exit_at_exceptions


def main() -> None:
"""Main entry point of the app."""
arg_parser = build_background_ingestor_arg_parser()
arg_namespace = arg_parser.parse_args()
config = build_scicat_config(arg_namespace)
config = build_scicat_background_ingester_config(arg_namespace)
logger = build_logger(config)

# Log the configuration as dictionary so that it is easier to read from the logs
Expand All @@ -52,13 +25,13 @@ def main() -> None:
)
logger.info(config.to_dict())

with exit_at_exceptions(logger):
nexus_file = arg_namespace.nexus_file
with exit_at_exceptions(logger, daemon=False):
nexus_file = pathlib.Path(config.single_run_options.nexus_file)
logger.info("Nexus file to be ingested : ")
logger.info(nexus_file)

done_writing_message_file = pathlib.Path(
arg_namespace.arg_namespace.done_writing_message_file
config.single_run_options.done_writing_message_file
)
logger.info("Done writing message file linked to nexus file : ")
logger.info(done_writing_message_file)
Expand Down
151 changes: 115 additions & 36 deletions src/scicat_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,58 @@
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import argparse
from collections.abc import Mapping
from dataclasses import dataclass
from dataclasses import asdict, dataclass
from types import MappingProxyType
from typing import Any


def _load_config(config_file: Any) -> dict:
"""Load configuration from the configuration file path."""
import json
import pathlib

if (
isinstance(config_file, str | pathlib.Path)
and (config_file_path := pathlib.Path(config_file)).is_file()
):
return json.loads(config_file_path.read_text())
return {}


def _merge_run_options(config_dict: dict, input_args_dict: dict) -> dict:
"""Merge configuration from the configuration file and input arguments."""
import copy

# Overwrite deep-copied options with command line arguments
run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {}))
for arg_name, arg_value in input_args_dict.items():
if arg_value is not None:
run_option_dict[arg_name] = arg_value

return run_option_dict


def _freeze_dict_items(d: dict) -> MappingProxyType:
"""Freeze the dictionary to make it read-only."""
return MappingProxyType(
{
key: MappingProxyType(value) if isinstance(value, dict) else value
for key, value in d.items()
}
)


def _recursive_deepcopy(obj: Any) -> dict:
"""Recursively deep copy a dictionary."""
if not isinstance(obj, dict | MappingProxyType):
return obj

copied = dict(obj)
for key, value in copied.items():
if isinstance(value, Mapping | MappingProxyType):
copied[key] = _recursive_deepcopy(value)

return copied


def build_main_arg_parser() -> argparse.ArgumentParser:
Expand Down Expand Up @@ -96,7 +147,6 @@ def build_main_arg_parser() -> argparse.ArgumentParser:

def build_background_ingestor_arg_parser() -> argparse.ArgumentParser:
parser = build_main_arg_parser()

group = parser.add_argument_group('Scicat Background Ingestor Options')

group.add_argument(
Expand Down Expand Up @@ -180,7 +230,7 @@ class kafkaOptions:


@dataclass
class ScicatConfig:
class IngesterConfig:
original_dict: Mapping
"""Original configuration dictionary in the json file."""
run_options: RunOptions
Expand All @@ -192,50 +242,79 @@ class ScicatConfig:

def to_dict(self) -> dict:
"""Return the configuration as a dictionary."""
from dataclasses import asdict

# Deep copy the original dictionary recursively
original_dict = dict(self.original_dict)
for key, value in original_dict.items():
if isinstance(value, Mapping):
original_dict[key] = dict(value)

copied = ScicatConfig(
original_dict, self.run_options, self.kafka_options, self.graylog_options
return asdict(
IngesterConfig(
_recursive_deepcopy(
self.original_dict
), # asdict does not support MappingProxyType
self.run_options,
self.kafka_options,
self.graylog_options,
)
)
return asdict(copied)


def build_scicat_config(input_args: argparse.Namespace) -> ScicatConfig:
def build_scicat_ingester_config(input_args: argparse.Namespace) -> IngesterConfig:
"""Merge configuration from the configuration file and input arguments."""
import copy
import json
import pathlib
from types import MappingProxyType
config_dict = _load_config(input_args.config_file)
run_option_dict = _merge_run_options(config_dict, vars(input_args))

# Read configuration file
if (
input_args.config_file
and (config_file_path := pathlib.Path(input_args.config_file)).is_file()
):
config_dict = json.loads(config_file_path.read_text())
else:
config_dict = {}
# Wrap configuration in a dataclass
return IngesterConfig(
original_dict=_freeze_dict_items(config_dict),
run_options=RunOptions(**run_option_dict),
kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})),
graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})),
)


@dataclass
class SingleRunOptions:
nexus_file: str
"""Full path of the input nexus file to be ingested."""
done_writing_message_file: str
"""Full path of the done writing message file that match the ``nexus_file``."""

# Overwrite deep-copied options with command line arguments
run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {}))
for arg_name, arg_value in vars(input_args).items():
if arg_value is not None:
run_option_dict[arg_name] = arg_value

# Protect original configuration by making it read-only
for key, value in config_dict.items():
config_dict[key] = MappingProxyType(value)
@dataclass
class BackgroundIngestorConfig(IngesterConfig):
single_run_options: SingleRunOptions
"""Single run configuration options for background ingestor."""

def to_dict(self) -> dict:
"""Return the configuration as a dictionary."""

return asdict(
BackgroundIngestorConfig(
_recursive_deepcopy(
self.original_dict
), # asdict does not support MappingProxyType
self.run_options,
self.kafka_options,
self.graylog_options,
self.single_run_options,
)
)


def build_scicat_background_ingester_config(
input_args: argparse.Namespace,
) -> BackgroundIngestorConfig:
"""Merge configuration from the configuration file and input arguments."""
config_dict = _load_config(input_args.config_file)
input_args_dict = vars(input_args)
single_run_option_dict = {
"nexus_file": input_args_dict.pop("nexus_file"),
"done_writing_message_file": input_args_dict.pop("done_writing_message_file"),
}
run_option_dict = _merge_run_options(config_dict, input_args_dict)

# Wrap configuration in a dataclass
return ScicatConfig(
original_dict=MappingProxyType(config_dict),
return BackgroundIngestorConfig(
original_dict=_freeze_dict_items(config_dict),
run_options=RunOptions(**run_option_dict),
kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})),
single_run_options=SingleRunOptions(**single_run_option_dict),
graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})),
)
33 changes: 3 additions & 30 deletions src/scicat_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,17 @@

del importlib

import logging
from collections.abc import Generator
from contextlib import contextmanager

from scicat_configuration import build_main_arg_parser, build_scicat_config
from scicat_configuration import build_main_arg_parser, build_scicat_ingester_config
from scicat_kafka import build_consumer, wrdn_messages
from scicat_logging import build_logger


def quit(logger: logging.Logger, unexpected: bool = True) -> None:
"""Log the message and exit the program."""
import sys

logger.info("Exiting ingestor")
sys.exit(1 if unexpected else 0)


@contextmanager
def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]:
"""Exit the program if an exception is raised."""
try:
yield
except KeyboardInterrupt:
logger.info("Received keyboard interrupt.")
quit(logger, unexpected=False)
except Exception as e:
logger.error("An exception occurred: %s", e)
quit(logger, unexpected=True)
else:
logger.error("Loop finished unexpectedly.")
quit(logger, unexpected=True)
from system_helpers import exit_at_exceptions


def main() -> None:
"""Main entry point of the app."""
arg_parser = build_main_arg_parser()
arg_namespace = arg_parser.parse_args()
config = build_scicat_config(arg_namespace)
config = build_scicat_ingester_config(arg_namespace)
logger = build_logger(config)

# Log the configuration as dictionary so that it is easier to read from the logs
Expand Down
4 changes: 2 additions & 2 deletions src/scicat_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import logging.handlers

import graypy
from scicat_configuration import ScicatConfig
from scicat_configuration import IngesterConfig


def build_logger(config: ScicatConfig) -> logging.Logger:
def build_logger(config: IngesterConfig) -> logging.Logger:
"""Build a logger and configure it according to the ``config``."""
run_options = config.run_options

Expand Down
Loading

0 comments on commit 317ec57

Please sign in to comment.