From f26ffc925134bdf5f3efff47e9e98c952f376a26 Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 16 Jul 2024 11:22:23 +0200 Subject: [PATCH 1/8] Fix logging. --- resources/config.sample.json | 2 +- src/scicat_configuration.py | 6 +++++- src/scicat_logging.py | 11 +++++++++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/resources/config.sample.json b/resources/config.sample.json index 66576db..437cd12 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -44,7 +44,7 @@ "file_log" : false, "file_log_base_name" : "scicat_ingestor_log", "file_log_timestamp" : false, - "loggin_level" : "INFO", + "logging_level" : "INFO", "system_log" : false, "system_log_facility" : "mail", "log_message_prefix" : " SFI ", diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index adb20f4..a9fb591 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -119,7 +119,11 @@ def build_main_arg_parser() -> argparse.ArgumentParser: default=" SFI: ", ) group.add_argument( - "--log-level", dest="log_level", help="Logging level", default="INFO", type=str + "--logging-level", + dest="log_level", + help="Logging level", + default="INFO", + type=str, ) group.add_argument( "--check-by-job-id", diff --git a/src/scicat_logging.py b/src/scicat_logging.py index 0104cbf..8fdec98 100644 --- a/src/scicat_logging.py +++ b/src/scicat_logging.py @@ -15,8 +15,15 @@ def build_logger(config: IngesterConfig) -> logging.Logger: # Build logger and formatter logger = logging.getLogger('esd extract parameters') formatter = logging.Formatter( - run_options.log_message_prefix - + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + " - ".join( + ( + run_options.log_message_prefix, + '%(asctime)s', + '%(name)s', + '%(levelname)s', + '%(message)s', + ) + ) ) # Add FileHandler From bdbed6057df75a2395bc82e643a00b9efc926b5f Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 16 Jul 2024 11:50:57 +0200 Subject: [PATCH 2/8] Separate configurations --- resources/config.sample.json | 98 +++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 47 deletions(-) diff --git a/resources/config.sample.json b/resources/config.sample.json index 437cd12..0a80e6e 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -15,63 +15,67 @@ "host": "https://useroffice.host", "username": "USERNAME", "password": "PASSWORD", - "token" : "JWT_TOKEN" + "token": "JWT_TOKEN" }, "scicat": { "host": "https://scicat.host", "username": "USERNAME", "password": "PASSWORD", - "token" : "JWT_TOKEN" - }, - "graylog": { - "host" : "", - "port" : "", - "facility" : "scicat.ingestor" + "token": "JWT_TOKEN" }, + "graylog": {"host": "", "port": "", "facility": "scicat.ingestor"}, "dataset": { - "instrument_id" : "ID_OF_FALLBACK_INSTRUMENT", - "instrument" : "FALLBACK_INSTRUMENT_NAME", - "default_proposal_id" : "DEFAULT_PROPOSAL_ID", - "ownable" : { - "ownerGroup": "DEFAULT_OWNER_GROUP", - "accessGroups": ["ACCESS_GROUP_1"] + "instrument_id": "ID_OF_FALLBACK_INSTRUMENT", + "instrument": "FALLBACK_INSTRUMENT_NAME", + "default_proposal_id": "DEFAULT_PROPOSAL_ID", + "ownable": { + "ownerGroup": "DEFAULT_OWNER_GROUP", + "accessGroups": ["ACCESS_GROUP_1"] } }, "options": { - "config_file" : "config.json", - "schemas_folders" : "schemas", - "verbose" : false, - "file_log" : false, - "file_log_base_name" : "scicat_ingestor_log", - "file_log_timestamp" : false, - "logging_level" : "INFO", - "system_log" : false, - "system_log_facility" : "mail", - "log_message_prefix" : " SFI ", - "check_by_job_id" : true, + "config_file": "config.json", + "verbose": false, + "file_log": false, + "file_log_base_name": "scicat_ingestor_log", + "file_log_timestamp": false, + "logging_level": "INFO", + "system_log": false, + "system_log_facility": "mail", + "log_message_prefix": "SFI", + "check_by_job_id": true, "pyscicat": null, - "graylog" : false, - "dry_run" : false, - "retrieve_instrument_from" : "default", - "instrument_position_in_file_path" : 3, - "message_to_file" : true, - "message_file_extension" : ".message.json", - "message_output" : "SOURCE_FOLDER", - "hdf_structure_in_metadata" : false, // not sure if needed - "hdf_structure_to_file" : true, // not sure if needed - "hdf_structure_file_extension" : ".hdf_structure.json", // not sure if needed - "hdf_structure_output" : "SOURCE_FOLDER", // not sure if needed - "local_output_folder" : "./data", - "compute_files_stats" : true, - "compute_files_hash" : true, - "file_hash_algorithm" : "blake2b", - "save_hash_in_file" : true, - "hash_file_extension" : "b2b", - "ingestor_files_folder": "ingestor", - "dataset_pid_prefix" : "20.500.12269", - "force_dataset_pid" : true, // not sure if needed - "use_job_id_as_dataset" : true, - "beautify_metadata_keys" : false, - "metadata_levels_separator": " " // not sure if needed + "graylog": false, + "dry_run": false + }, + "ingestion_options": { + "schemas_directories": "schemas", + "retrieve_instrument_from": "default", + "instrument_position_in_file_path": 3, + "message_saving_options": { + "message_to_file": true, + "message_file_extension": ".message.json", + "message_output": "SOURCE_FOLDER" + }, + "hdf_structure_in_metadata": false, + "file_handling_options": { + "hdf_structure_to_file": true, + "hdf_structure_file_extension": ".hdf_structure.json", + "hdf_structure_output": "SOURCE_FOLDER", + "local_output_folder": "./data", + "compute_files_stats": true, + "compute_files_hash": true, + "file_hash_algorithm": "blake2b", + "save_hash_in_file": true, + "hash_file_extension": "b2b", + "ingestor_files_directory": "ingestor" + }, + "dataset_options": { + "force_dataset_pid": true, + "dataset_pid_prefix": "20.500.12269", + "use_job_id_as_dataset": true, + "beautify_metadata_keys": false, + "metadata_levels_separator": " " + } } } From ffa9daa73caaa128d39d3d16e37399f9c47ebcdf Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 16 Jul 2024 11:52:02 +0200 Subject: [PATCH 3/8] Add h5py in the dependencies. --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 07cbddd..87d2bea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,8 @@ dependencies = [ "ess-streaming-data-types", "requests", "rich", - "graypy" + "graypy", + "h5py", ] dynamic = ["version"] From a9adf7ebceba004123286511e04cc886822e064a Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 16 Jul 2024 11:52:59 +0200 Subject: [PATCH 4/8] Reorder dependencies. --- pyproject.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 87d2bea..d827ed3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,13 +28,13 @@ requires-python = ">=3.12" # Run 'tox -e deps' after making changes here. This will update requirement files. # Make sure to list one dependency per line. dependencies = [ - "kafka-python", "confluent_kafka", "ess-streaming-data-types", - "requests", - "rich", "graypy", "h5py", + "kafka-python", + "requests", + "rich" ] dynamic = ["version"] From 322978240de8a2cf171d5f96fd53dedb55a4030b Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 16 Jul 2024 13:57:19 +0200 Subject: [PATCH 5/8] Separate metadata related configurations/functionalities, make skeletons of needed functions --- resources/config.sample.json | 12 ++-- src/background_ingestor.py | 103 ++++++++++++----------------------- src/scicat_configuration.py | 75 ++++++++++++++++++++++--- src/scicat_metadata.py | 67 +++++++++++++++++++++++ 4 files changed, 175 insertions(+), 82 deletions(-) diff --git a/resources/config.sample.json b/resources/config.sample.json index 0a80e6e..fc2a285 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -45,20 +45,20 @@ "log_message_prefix": "SFI", "check_by_job_id": true, "pyscicat": null, - "graylog": false, - "dry_run": false - }, + "graylog": false + }, "ingestion_options": { - "schemas_directories": "schemas", + "dry_run": false, + "schemas_directory": "schemas", "retrieve_instrument_from": "default", "instrument_position_in_file_path": 3, "message_saving_options": { "message_to_file": true, - "message_file_extension": ".message.json", + "message_file_extension": "message.json", "message_output": "SOURCE_FOLDER" }, - "hdf_structure_in_metadata": false, "file_handling_options": { + "hdf_structure_in_metadata": false, "hdf_structure_to_file": true, "hdf_structure_file_extension": ".hdf_structure.json", "hdf_structure_output": "SOURCE_FOLDER", diff --git a/src/background_ingestor.py b/src/background_ingestor.py index e2736be..cb4661b 100644 --- a/src/background_ingestor.py +++ b/src/background_ingestor.py @@ -4,63 +4,27 @@ import datetime import json import pathlib -import h5py -import os +import h5py +import requests from scicat_configuration import ( + BackgroundIngestorConfig, build_background_ingestor_arg_parser, - build_scicat_background_ingester_config, BackgroundIngestorConfig, + build_scicat_background_ingester_config, ) from scicat_logging import build_logger +from scicat_metadata import collect_schemas, select_applicable_schema from system_helpers import exit_at_exceptions +def replace_variables_values(url: str, values: dict) -> str: + for key, value in values.items(): + url = url.replace("{" + key + "}", str(value)) + return url -def list_schema_files(schemas_folder): - """ - return the list of the metadata schema configuration available in the folder provided - valid metadata schema configuration ends with imsc.json - imsc = ingestor metadata schema configuration - """ - return [file for file in os.listdir(schemas_folder) if file.endswith("imsc.json") and not file.startswith(".")] - - -def select_applicable_schema(nexus_file, nxs, schemas): - """ - This function evaluate which metadata schema configuration is applicable to this file. - Order of the schemas matters and first schema that is suitable is selected. - """ - for schema in schemas.values(): - if isinstance(schema['selector'], str): - selector_list = schema['selector'].split(':') - selector = { - "operand_1": selector_list[0], - "operation": selector_list[1], - "operand_2": selector_list[2], - } - elif isinstance(schema['selector'], dict): - selector = schema['selector'] - else: - raise Exception("Invalid type for schema selector") - - if selector['operand_1'] in [ - "filename", - "data_file", - "nexus_file", - "data_file_name", - ]: - selector['operand_1_value'] = nexus_file - - if selector['operation'] == "starts_with": - if selector['operand_1_value'].startswith(selector['operand_2']): - return schema - - raise Exception("No applicable metadata schema configuration found!!") def extract_variables_values( - variables: dict, - h5file, - config: BackgroundIngestorConfig + variables: dict, h5file, config: BackgroundIngestorConfig ) -> dict: values = {} @@ -75,26 +39,24 @@ def extract_variables_values( elif source == "SC": # build url url = replace_variables_values( - config[""]["scicat_url"] + variables[variable]["url"], - values + config[""]["scicat_url"] + variables[variable]["url"], values ) # retrieve value from SciCat response = requests.get( url, - headers = { - "token": config[""]["token"] - } + headers={"token": config[""]["token"]}, + timeout=10, # TODO: decide timeout ) # extract value value = response.json()[variables[variable]["field"]] elif source == "VALUE": # the value is the one indicated # there might be some substitution needed - value = replace_variables_values( - variables[variable]["value"], - values - ) - if "operator" in variables[variable].keys() and variables[variable]["operator"]: + value = replace_variables_values(variables[variable]["value"], values) + if ( + "operator" in variables[variable].keys() + and variables[variable]["operator"] + ): operator = variables[variable]["operator"] if operator == "join_with_space": value = ", ".join(value) @@ -102,7 +64,7 @@ def extract_variables_values( raise Exception("Invalid variable source configuration") value_type = variables[variable]["value_type"] - if value_type == "string": + if value_type == "string": value = str(value) elif value_type == "string[]": value = [str(v) for v in value] @@ -110,9 +72,9 @@ def extract_variables_values( value = int(value) elif value_type == "float": value = float(value) - elif value_type == "date" and isinstance(value,int): - value = datetime.datetime.fromtimestamp(value).isoformat() - elif value_type == "date" and isinstance(value,str): + elif value_type == "date" and isinstance(value, int): + value = datetime.datetime.fromtimestamp(value, tz=datetime.UTC).isoformat() + elif value_type == "date" and isinstance(value, str): value = datetime.datetime.fromisoformat(value).isoformat() values[variable] = value @@ -120,11 +82,19 @@ def extract_variables_values( return values +def prepare_scicat_dataset(schema, values): ... +def create_scicat_dataset(dataset): ... +def create_scicat_origdatablock( + scicat_dataset_pid, nexus_file=None, done_writing_message_file=None +): ... + + def main() -> None: """Main entry point of the app.""" arg_parser = build_background_ingestor_arg_parser() arg_namespace = arg_parser.parse_args() config = build_scicat_background_ingester_config(arg_namespace) + ingestion_options = config.ingestion_options logger = build_logger(config) # Log the configuration as dictionary so that it is easier to read from the logs @@ -133,13 +103,8 @@ def main() -> None: ) logger.info(config.to_dict()) - # load metadata schema configurations - # list files in schema folders - schemas = {} - for schema_file in list_schema_files(): - with open(schema_file, 'r') as fh: - current_schema = json.load(fh) - schemas[current_schema['id']] = current_schema + # Collect all metadata schema configurations + schemas = collect_schemas(ingestion_options.schema_directory) with exit_at_exceptions(logger, daemon=False): nexus_file = pathlib.Path(config.single_run_options.nexus_file) @@ -175,10 +140,11 @@ def main() -> None: ) # create dataset in scicat - scicat_dataset_pid = create_Scicat_dataset(scicat_dataset) + scicat_dataset_pid = create_scicat_dataset(scicat_dataset) # create and populate scicat origdatablock entry # with files and hashes previously computed + scicat_origdatablock = create_scicat_origdatablock( scicat_dataset_pid, nexus_file, done_writing_message_file ) @@ -187,3 +153,4 @@ def main() -> None: scicat_origdatablock_id = create_scicat_origdatablock(scicat_origdatablock) # return successful code + return scicat_origdatablock_id diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index a9fb591..ddba248 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -22,15 +22,11 @@ def _load_config(config_file: Any) -> dict: def _merge_run_options(config_dict: dict, input_args_dict: dict) -> dict: """Merge configuration from the configuration file and input arguments.""" - import copy - # Overwrite deep-copied options with command line arguments - run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {})) - for arg_name, arg_value in input_args_dict.items(): - if arg_value is not None: - run_option_dict[arg_name] = arg_value - - return run_option_dict + return { + **config_dict.setdefault("options", {}), + **{key: value for key, value in input_args_dict.items() if value is not None}, + } def _freeze_dict_items(d: dict) -> MappingProxyType: @@ -281,10 +277,70 @@ class SingleRunOptions: """Full path of the done writing message file that match the ``nexus_file``.""" +@dataclass +class MessageSavingOptions: + message_to_file: bool = True + """Save messages to a file.""" + message_file_extension: str = "message.json" + """Message file extension.""" + message_output: str = "SOURCE_FOLDER" + """Output directory for messages.""" + + +@dataclass +class FileHandlingOptions: + hdf_structure_in_metadata: bool = False # Not sure if needed + hdf_structure_to_file: bool = True # Not sure if needed + hdf_structure_file_extension: str = "hdf_structure.json" # Not sure if needed + hdf_structure_output: str = "SOURCE_FOLDER" # Not sure if needed + local_output_directory: str = "data" + compute_files_stats: bool = True + compute_file_hash: bool = True + file_hash_algorithm: str = "blake2b" + save_file_hash: bool = True + hash_file_extension: str = "b2b" + ingestor_files_directory: str = "ingestor" + + +@dataclass +class DatasetOptions: + force_dataset_pid: bool = True # Not sure if needed + dataset_id_prefix: str = "20.500.12269" + use_job_id_as_dataset_id: bool = True + beautify_metadata_keys: bool = False + metadata_levels_separator: str = " " + + +@dataclass +class IngestionOptions: + message_saving_options: MessageSavingOptions + file_handling_options: FileHandlingOptions + dataset_options: DatasetOptions + schema_directory: str = "schemas" + retrieve_instrument_from: str = "default" + instrument_position_in_file_path: int = 3 + + @classmethod + def from_configurations(cls, config: dict) -> "IngestionOptions": + """Create IngestionOptions from a dictionary.""" + return cls( + MessageSavingOptions(**config.get("message_saving_options", {})), + FileHandlingOptions(**config.get("file_handling_options", {})), + DatasetOptions(**config.get("dataset_options", {})), + schema_directory=config.get("schema_directory", "schemas"), + retrieve_instrument_from=config.get("retrieve_instrument_from", "default"), + instrument_position_in_file_path=config.get( + "instrument_position_in_file_path", 3 + ), + ) + + @dataclass class BackgroundIngestorConfig(IngesterConfig): single_run_options: SingleRunOptions """Single run configuration options for background ingestor.""" + ingestion_options: IngestionOptions + """Ingestion configuration options for background ingestor.""" def to_dict(self) -> dict: """Return the configuration as a dictionary.""" @@ -298,6 +354,7 @@ def to_dict(self) -> dict: self.kafka_options, self.graylog_options, self.single_run_options, + self.ingestion_options, ) ) @@ -313,6 +370,7 @@ def build_scicat_background_ingester_config( "done_writing_message_file": input_args_dict.pop("done_writing_message_file"), } run_option_dict = _merge_run_options(config_dict, input_args_dict) + ingestion_option_dict = config_dict.setdefault("ingestion_options", {}) # Wrap configuration in a dataclass return BackgroundIngestorConfig( @@ -321,4 +379,5 @@ def build_scicat_background_ingester_config( kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})), single_run_options=SingleRunOptions(**single_run_option_dict), graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})), + ingestion_options=IngestionOptions.from_configurations(ingestion_option_dict), ) diff --git a/src/scicat_metadata.py b/src/scicat_metadata.py index baed194..3367588 100644 --- a/src/scicat_metadata.py +++ b/src/scicat_metadata.py @@ -1,5 +1,7 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) +import json +import pathlib from collections.abc import Callable from importlib.metadata import entry_points @@ -10,3 +12,68 @@ def load_metadata_extractors(extractor_name: str) -> Callable: return entry_points(group="scicat_ingestor.metadata_extractor")[ extractor_name ].load() + + +def list_schema_file_names(schemas_directory: str | pathlib.Path) -> list[str]: + """ + Return a list of the metadata schema file names found in ``schemas_directory``. + + Valid metadata schema configuration ends with imsc.json + ``imsc`` stands for Ingestor Metadata Schema Configuration + """ + import os + + return [ + file_name + for file_name in os.listdir(schemas_directory) + if file_name.endswith("imsc.json") and not file_name.startswith(".") + ] + + +def _load_json_schema(schema_file_name: str) -> dict: + with open(schema_file_name) as fh: + return json.load(fh) + + +def collect_schemas(dir_path: str | pathlib.Path) -> dict: + """ + Return a dictionary of the metadata schema configurations found in ``dir_path``. + """ + return { + (schema := _load_json_schema(schema_file_name))["id"]: schema + for schema_file_name in list_schema_file_names(dir_path) + } + + +def select_applicable_schema(nexus_file, nxs, schemas): + """ + Evaluates which metadata schema configuration is applicable to ``nexus_file``. + + Order of the schemas matters and first schema that is suitable is selected. + """ + for schema in schemas.values(): + if isinstance(schema['selector'], str): + selector_list = schema['selector'].split(':') + selector = { + "operand_1": selector_list[0], + "operation": selector_list[1], + "operand_2": selector_list[2], + } + elif isinstance(schema['selector'], dict): + selector = schema['selector'] + else: + raise Exception("Invalid type for schema selector") + + if selector['operand_1'] in [ + "filename", + "data_file", + "nexus_file", + "data_file_name", + ]: + selector['operand_1_value'] = nexus_file + + if selector['operation'] == "starts_with": + if selector['operand_1_value'].startswith(selector['operand_2']): + return schema + + raise Exception("No applicable metadata schema configuration found!!") From 4c8967b4c0c67a65fc0a5600f3a0e23e5f9faec2 Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 16 Jul 2024 14:10:11 +0200 Subject: [PATCH 6/8] Separate metadata related configurations/functionalities, make skeletons of needed functions --- config.20240405.json | 4 ++-- resources/config.sample.json | 10 +++++----- src/scicat_configuration.py | 22 +++++++++++++++------- src/scicat_logging.py | 8 ++++---- tests/test_logging.py | 4 ++-- 5 files changed, 28 insertions(+), 20 deletions(-) diff --git a/config.20240405.json b/config.20240405.json index 1c09f99..04de76d 100644 --- a/config.20240405.json +++ b/config.20240405.json @@ -29,9 +29,9 @@ "config_file" : "config.json", "verbose" : false, "file_log" : false, - "log_filepath_prefix" : ".scicat_ingestor_log", + "file_log_base_name" : ".scicat_ingestor_log", "file_log_timestamp" : false, - "log_level" : "INFO", + "logging_level" : "INFO", "system_log" : false, "system_log_facility" : "mail", "log_message_prefix" : " SFI: ", diff --git a/resources/config.sample.json b/resources/config.sample.json index fc2a285..99e67fd 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -62,18 +62,18 @@ "hdf_structure_to_file": true, "hdf_structure_file_extension": ".hdf_structure.json", "hdf_structure_output": "SOURCE_FOLDER", - "local_output_folder": "./data", - "compute_files_stats": true, - "compute_files_hash": true, + "local_output_directory": "data", + "compute_file_stats": true, + "compute_file_hash": true, "file_hash_algorithm": "blake2b", - "save_hash_in_file": true, + "save_file_hash": true, "hash_file_extension": "b2b", "ingestor_files_directory": "ingestor" }, "dataset_options": { "force_dataset_pid": true, "dataset_pid_prefix": "20.500.12269", - "use_job_id_as_dataset": true, + "use_job_id_as_dataset_id": true, "beautify_metadata_keys": false, "metadata_levels_separator": " " } diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index ddba248..b472aee 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -83,8 +83,8 @@ def build_main_arg_parser() -> argparse.ArgumentParser: default=False, ) group.add_argument( - "--log-filepath-prefix", - dest="log_filepath_prefix", + "--file-log-base-name", + dest="file_log_base_name", help="Prefix of the log file path", default=".scicat_ingestor_log", ) @@ -116,7 +116,7 @@ def build_main_arg_parser() -> argparse.ArgumentParser: ) group.add_argument( "--logging-level", - dest="log_level", + dest="logging_level", help="Logging level", default="INFO", type=str, @@ -196,11 +196,11 @@ class RunOptions: config_file: str verbose: bool file_log: bool - log_filepath_prefix: str + file_log_base_name: str file_log_timestamp: bool system_log: bool log_message_prefix: str - log_level: str + logging_level: str check_by_job_id: bool system_log_facility: str | None = None pyscicat: str | None = None @@ -221,6 +221,14 @@ class kafkaOptions: """Kafka consumer group ID.""" bootstrap_servers: list[str] | str = "localhost:9092" """List of Kafka bootstrap servers. Multiple servers can be separated by commas.""" + sasl_mechanism: str = "PLAIN" + """Kafka SASL mechanism.""" + sasl_username: str = "" + """Kafka SASL username.""" + sasl_password: str = "" + """Kafka SASL password.""" + ssl_ca_location: str = "" + """Kafka SSL CA location.""" individual_message_commit: bool = False """Commit for each topic individually.""" enable_auto_commit: bool = True @@ -294,7 +302,7 @@ class FileHandlingOptions: hdf_structure_file_extension: str = "hdf_structure.json" # Not sure if needed hdf_structure_output: str = "SOURCE_FOLDER" # Not sure if needed local_output_directory: str = "data" - compute_files_stats: bool = True + compute_file_stats: bool = True compute_file_hash: bool = True file_hash_algorithm: str = "blake2b" save_file_hash: bool = True @@ -305,7 +313,7 @@ class FileHandlingOptions: @dataclass class DatasetOptions: force_dataset_pid: bool = True # Not sure if needed - dataset_id_prefix: str = "20.500.12269" + dataset_pid_prefix: str = "20.500.12269" use_job_id_as_dataset_id: bool = True beautify_metadata_keys: bool = False metadata_levels_separator: str = " " diff --git a/src/scicat_logging.py b/src/scicat_logging.py index 8fdec98..e25453a 100644 --- a/src/scicat_logging.py +++ b/src/scicat_logging.py @@ -28,7 +28,7 @@ def build_logger(config: IngesterConfig) -> logging.Logger: # Add FileHandler if run_options.file_log: - file_name_components = [run_options.log_filepath_prefix] + file_name_components = [run_options.file_log_base_name] if run_options.file_log_timestamp: file_name_components.append( datetime.datetime.now(datetime.UTC).strftime('%Y%m%d%H%M%S%f') @@ -54,9 +54,9 @@ def build_logger(config: IngesterConfig) -> logging.Logger: logger.addHandler(graylog_handler) # Set the level and formatter for all handlers - logger.setLevel(run_options.log_level) + logger.setLevel(run_options.logging_level) for handler in logger.handlers: - handler.setLevel(run_options.log_level) + handler.setLevel(run_options.logging_level) handler.setFormatter(formatter) # Add StreamHandler @@ -64,6 +64,6 @@ def build_logger(config: IngesterConfig) -> logging.Logger: if run_options.verbose: from rich.logging import RichHandler - logger.addHandler(RichHandler(level=run_options.log_level)) + logger.addHandler(RichHandler(level=run_options.logging_level)) return logger diff --git a/tests/test_logging.py b/tests/test_logging.py index f131515..82ea9ba 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -17,12 +17,12 @@ def scicat_config(tmp_path: pathlib.Path) -> IngesterConfig: config_file='test', verbose=True, file_log=True, - log_filepath_prefix=(tmp_path / pathlib.Path('test')).as_posix(), + file_log_base_name=(tmp_path / pathlib.Path('test')).as_posix(), file_log_timestamp=True, system_log=False, system_log_facility=None, log_message_prefix='test', - log_level='DEBUG', + logging_level='DEBUG', check_by_job_id=True, pyscicat='test', ), From 8cbd9753933e013f3785347b2dc20395ae912fd9 Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 16 Jul 2024 14:16:37 +0200 Subject: [PATCH 7/8] Remove unecessary variables. --- src/background_ingestor.py | 39 +++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/src/background_ingestor.py b/src/background_ingestor.py index cb4661b..6e123b9 100644 --- a/src/background_ingestor.py +++ b/src/background_ingestor.py @@ -107,30 +107,31 @@ def main() -> None: schemas = collect_schemas(ingestion_options.schema_directory) with exit_at_exceptions(logger, daemon=False): - nexus_file = pathlib.Path(config.single_run_options.nexus_file) - logger.info("Nexus file to be ingested : ") - logger.info(nexus_file) - - done_writing_message_file = pathlib.Path( - config.single_run_options.done_writing_message_file + logger.info( + "Nexus file to be ingested : %s", + (nexus_file_path := pathlib.Path(config.single_run_options.nexus_file)), + ) + logger.info( + "Done writing message file linked to nexus file : %s", + ( + done_writing_message_file := pathlib.Path( + config.single_run_options.done_writing_message_file + ) + ), ) - logger.info("Done writing message file linked to nexus file : ") - logger.info(done_writing_message_file) # open and read done writing message input file - done_writing_message = json.load(done_writing_message_file.open()) - logger.info(done_writing_message) + logger.info(json.load(done_writing_message_file.open())) # open nexus file with h5py - h5file = h5py.File(nexus_file) - - # load instrument metadata configuration - metadata_schema = select_applicable_schema(nexus_file, h5file, schemas) + with h5py.File(nexus_file_path) as h5file: + # load instrument metadata configuration + metadata_schema = select_applicable_schema(nexus_file_path, h5file, schemas) - # define variables values - variables_values = extract_variables_values( - metadata_schema['variables'], h5file, config - ) + # define variables values + variables_values = extract_variables_values( + metadata_schema['variables'], h5file, config + ) # create b2blake hash of all the files @@ -146,7 +147,7 @@ def main() -> None: # with files and hashes previously computed scicat_origdatablock = create_scicat_origdatablock( - scicat_dataset_pid, nexus_file, done_writing_message_file + scicat_dataset_pid, nexus_file_path, done_writing_message_file ) # create origdatablock in scicat From 974644404860a5607daa428b63de73e2c74b9f1e Mon Sep 17 00:00:00 2001 From: YooSunyoung Date: Tue, 16 Jul 2024 16:36:38 +0200 Subject: [PATCH 8/8] Fix scicat ingestor. --- resources/config.sample.json | 12 +-- src/scicat_configuration.py | 146 ++++++++++++++++++++--------------- src/scicat_ingestor.py | 119 ++++++++++++++-------------- src/scicat_kafka.py | 35 ++++++++- src/scicat_path_helpers.py | 15 ++++ tests/test_logging.py | 7 ++ 6 files changed, 208 insertions(+), 126 deletions(-) create mode 100644 src/scicat_path_helpers.py diff --git a/resources/config.sample.json b/resources/config.sample.json index 99e67fd..80bc88d 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -9,7 +9,12 @@ "ssl_ca_location": "FULL_PATH_TO_CERTIFICATE_FILE", "individual_message_commit": true, "enable_auto_commit": true, - "auto_offset_reset": "earliest" + "auto_offset_reset": "earliest", + "message_saving_options": { + "message_to_file": true, + "message_file_extension": "message.json", + "message_output": "SOURCE_FOLDER" + } }, "user_office": { "host": "https://useroffice.host", @@ -52,11 +57,6 @@ "schemas_directory": "schemas", "retrieve_instrument_from": "default", "instrument_position_in_file_path": 3, - "message_saving_options": { - "message_to_file": true, - "message_file_extension": "message.json", - "message_output": "SOURCE_FOLDER" - }, "file_handling_options": { "hdf_structure_in_metadata": false, "hdf_structure_to_file": true, diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index b472aee..610cfef 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -207,6 +207,19 @@ class RunOptions: graylog: bool = False +@dataclass(frozen=True) +class MessageSavingOptions: + message_to_file: bool = True + """Save messages to a file.""" + message_file_extension: str = "message.json" + """Message file extension.""" + message_output: str = "SOURCE_FOLDER" + """Output directory for messages.""" + + +DEFAULT_MESSAGE_SAVING_OPTIONS = MessageSavingOptions() + + @dataclass class kafkaOptions: """KafkaOptions dataclass to store the configuration options. @@ -235,66 +248,22 @@ class kafkaOptions: """Enable Kafka auto commit.""" auto_offset_reset: str = "earliest" """Kafka auto offset reset.""" + message_saving_options: MessageSavingOptions = DEFAULT_MESSAGE_SAVING_OPTIONS + """Message saving options.""" - -@dataclass -class IngesterConfig: - original_dict: Mapping - """Original configuration dictionary in the json file.""" - run_options: RunOptions - """Merged configuration dictionary with command line arguments.""" - kafka_options: kafkaOptions - """Kafka configuration options read from files.""" - graylog_options: GraylogOptions - """Graylog configuration options for streaming logs.""" - - def to_dict(self) -> dict: - """Return the configuration as a dictionary.""" - - return asdict( - IngesterConfig( - _recursive_deepcopy( - self.original_dict - ), # asdict does not support MappingProxyType - self.run_options, - self.kafka_options, - self.graylog_options, - ) + @classmethod + def from_configurations(cls, config: dict) -> "kafkaOptions": + """Create kafkaOptions from a dictionary.""" + return cls( + **{ + **config, + "message_saving_options": MessageSavingOptions( + **config.get("message_saving_options", {}) + ), + }, ) -def build_scicat_ingester_config(input_args: argparse.Namespace) -> IngesterConfig: - """Merge configuration from the configuration file and input arguments.""" - config_dict = _load_config(input_args.config_file) - run_option_dict = _merge_run_options(config_dict, vars(input_args)) - - # Wrap configuration in a dataclass - return IngesterConfig( - original_dict=_freeze_dict_items(config_dict), - run_options=RunOptions(**run_option_dict), - kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})), - graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})), - ) - - -@dataclass -class SingleRunOptions: - nexus_file: str - """Full path of the input nexus file to be ingested.""" - done_writing_message_file: str - """Full path of the done writing message file that match the ``nexus_file``.""" - - -@dataclass -class MessageSavingOptions: - message_to_file: bool = True - """Save messages to a file.""" - message_file_extension: str = "message.json" - """Message file extension.""" - message_output: str = "SOURCE_FOLDER" - """Output directory for messages.""" - - @dataclass class FileHandlingOptions: hdf_structure_in_metadata: bool = False # Not sure if needed @@ -321,7 +290,6 @@ class DatasetOptions: @dataclass class IngestionOptions: - message_saving_options: MessageSavingOptions file_handling_options: FileHandlingOptions dataset_options: DatasetOptions schema_directory: str = "schemas" @@ -332,7 +300,6 @@ class IngestionOptions: def from_configurations(cls, config: dict) -> "IngestionOptions": """Create IngestionOptions from a dictionary.""" return cls( - MessageSavingOptions(**config.get("message_saving_options", {})), FileHandlingOptions(**config.get("file_handling_options", {})), DatasetOptions(**config.get("dataset_options", {})), schema_directory=config.get("schema_directory", "schemas"), @@ -343,12 +310,66 @@ def from_configurations(cls, config: dict) -> "IngestionOptions": ) +@dataclass +class IngesterConfig: + original_dict: Mapping + """Original configuration dictionary in the json file.""" + run_options: RunOptions + """Merged configuration dictionary with command line arguments.""" + kafka_options: kafkaOptions + """Kafka configuration options read from files.""" + graylog_options: GraylogOptions + """Graylog configuration options for streaming logs.""" + ingestion_options: IngestionOptions + """Ingestion configuration options for background ingestor.""" + + def to_dict(self) -> dict: + """Return the configuration as a dictionary.""" + + return asdict( + IngesterConfig( + _recursive_deepcopy( + self.original_dict + ), # asdict does not support MappingProxyType + self.run_options, + self.kafka_options, + self.graylog_options, + self.ingestion_options, + ) + ) + + +def build_scicat_ingester_config(input_args: argparse.Namespace) -> IngesterConfig: + """Merge configuration from the configuration file and input arguments.""" + config_dict = _load_config(input_args.config_file) + run_option_dict = _merge_run_options(config_dict, vars(input_args)) + + # Wrap configuration in a dataclass + return IngesterConfig( + original_dict=_freeze_dict_items(config_dict), + run_options=RunOptions(**run_option_dict), + kafka_options=kafkaOptions.from_configurations( + config_dict.setdefault("kafka", {}) + ), + graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})), + ingestion_options=IngestionOptions.from_configurations( + config_dict.setdefault("ingestion_options", {}) + ), + ) + + +@dataclass +class SingleRunOptions: + nexus_file: str + """Full path of the input nexus file to be ingested.""" + done_writing_message_file: str + """Full path of the done writing message file that match the ``nexus_file``.""" + + @dataclass class BackgroundIngestorConfig(IngesterConfig): single_run_options: SingleRunOptions """Single run configuration options for background ingestor.""" - ingestion_options: IngestionOptions - """Ingestion configuration options for background ingestor.""" def to_dict(self) -> dict: """Return the configuration as a dictionary.""" @@ -361,8 +382,8 @@ def to_dict(self) -> dict: self.run_options, self.kafka_options, self.graylog_options, - self.single_run_options, self.ingestion_options, + self.single_run_options, ) ) @@ -379,12 +400,13 @@ def build_scicat_background_ingester_config( } run_option_dict = _merge_run_options(config_dict, input_args_dict) ingestion_option_dict = config_dict.setdefault("ingestion_options", {}) + kafka_option_dict = config_dict.setdefault("kafka", {}) # Wrap configuration in a dataclass return BackgroundIngestorConfig( original_dict=_freeze_dict_items(config_dict), run_options=RunOptions(**run_option_dict), - kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})), + kafka_options=kafkaOptions.from_configurations(kafka_option_dict), single_run_options=SingleRunOptions(**single_run_option_dict), graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})), ingestion_options=IngestionOptions.from_configurations(ingestion_option_dict), diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py index 5caa48d..b42fdb2 100644 --- a/src/scicat_ingestor.py +++ b/src/scicat_ingestor.py @@ -3,6 +3,8 @@ # ruff: noqa: E402, F401 import importlib.metadata +import logging +import pathlib try: __version__ = importlib.metadata.version(__package__ or __name__) @@ -11,12 +13,46 @@ del importlib -from scicat_configuration import build_main_arg_parser, build_scicat_ingester_config -from scicat_kafka import build_consumer, wrdn_messages +from scicat_configuration import ( + MessageSavingOptions, + build_main_arg_parser, + build_scicat_ingester_config, +) +from scicat_kafka import ( + WritingFinished, + build_consumer, + compose_message_path, + save_message_to_file, + wrdn_messages, +) from scicat_logging import build_logger +from scicat_path_helpers import select_target_directory from system_helpers import exit_at_exceptions +def dump_message_to_file_if_needed( + *, + logger: logging.Logger, + message_file_path: pathlib.Path, + message_saving_options: MessageSavingOptions, + message: WritingFinished, +) -> None: + """Dump the message to a file according to the configuration.""" + if not message_saving_options.message_to_file: + logger.info("Message saving to file is disabled. Skipping saving message.") + return + elif not message_file_path.parent.exists(): + logger.info("Message file directory not accessible. Skipping saving message.") + return + + logger.info("Message will be saved in %s", message_file_path) + save_message_to_file( + message=message, + message_file_path=message_file_path, + ) + logger.info("Message file saved") + + def main() -> None: """Main entry point of the app.""" arg_parser = build_main_arg_parser() @@ -28,6 +64,9 @@ def main() -> None: logger.info('Starting the Scicat online Ingestor with the following configuration:') logger.info(config.to_dict()) + # Often used options + message_saving_options = config.kafka_options.message_saving_options + with exit_at_exceptions(logger): # Kafka consumer if (consumer := build_consumer(config.kafka_options, logger)) is None: @@ -37,60 +76,24 @@ def main() -> None: for message in wrdn_messages(consumer, logger): logger.info("Processing message: %s", message) - # check if we have received a WRDN message - # if message is not a WRDN, we get None back + # Check if we have received a WRDN message. + # ``message: None | WritingFinished`` if message: - # extract nexus file name from message - nexus_filename = message.file_name - - # extract job id from message - job_id = message.job_id - - if config["run_options"]["message_to_file"]: - # Move this to library file as it is used also in background ingestor - ingestor_files_path = ( - os.path.join( - os.path.dirname(path_name), - config["run_options"]["ingestor_files_folder"], - ) - if config["run_options"]["hdf_structure_output"] - == "SOURCE_FOLDER" - else os.path.abspath( - config["run_options"]["files_output_folder"] - ) - ) - logger.info("Ingestor files folder: {}".format(ingestor_files_path)) - - message_file_path = ingestor_files_path - logger.info( - "message file will be saved in {}".format(message_file_path) - ) - if os.path.exists(message_file_path): - message_file_name = ( - os.path.splitext(filename)[0] - + config["run_options"]["message_file_extension"] - ) - logger.info("message file name : " + message_file_name) - message_full_file_path = os.path.join( - message_file_path, message_file_name - ) - logger.info( - "message full file path : " + message_full_file_path - ) - with open(message_full_file_path, 'w') as fh: - json.dump(message, fh) - logger.info("message saved to file") - if config["run_options"]["message_output"] == "SOURCE_FOLDER": - files_list += [ - { - "path": message_full_file_path, - "size": len(json.dumps(entry)), - } - ] - fix_dataset_source_folder = True - else: - logger.info("Message file path not accessible") - + # Extract nexus file path from the message. + nexus_file_path = pathlib.Path(message.file_name) + file_saving_dir = select_target_directory( + config.ingestion_options.file_handling_options, nexus_file_path + ) + dump_message_to_file_if_needed( + logger=logger, + message_saving_options=message_saving_options, + message=message, + message_file_path=compose_message_path( + target_dir=file_saving_dir, + nexus_file_path=nexus_file_path, + message_saving_options=message_saving_options, + ), + ) # instantiate a new process and runs background ingestor # on the nexus file # use open process and wait for outcome @@ -99,12 +102,14 @@ def main() -> None: -c configuration_file -f nexus_filename -j job_id - -m message_file_path + -m message_file_path # optional depending on the + # message_saving_options.message_output """ # if background process is successful # check if we need to commit the individual message """ - if config.kafka_options.individual_message_commit and background_process is successful: + if config.kafka_options.individual_message_commit \ + and background_process is successful: consumer.commit(message=message) """ diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index 6f74781..8ed99e5 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -1,10 +1,11 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject) import logging +import pathlib from collections.abc import Generator from confluent_kafka import Consumer -from scicat_configuration import kafkaOptions +from scicat_configuration import MessageSavingOptions, kafkaOptions from streaming_data_types import deserialise_wrdn from streaming_data_types.finished_writing_wrdn import ( FILE_IDENTIFIER as WRDN_FILE_IDENTIFIER, @@ -133,3 +134,35 @@ def wrdn_messages( yield _deserialise_wrdn(message_value, logger) else: yield None + + +def compose_message_path( + *, + target_dir: pathlib.Path, + nexus_file_path: pathlib.Path, + message_saving_options: MessageSavingOptions, +) -> pathlib.Path: + """Compose the message path based on the nexus file path and configuration.""" + + return target_dir / ( + pathlib.Path( + ".".join( + ( + nexus_file_path.stem, + message_saving_options.message_file_extension.removeprefix("."), + ) + ) + ) + ) + + +def save_message_to_file( + *, + message: WritingFinished, + message_file_path: pathlib.Path, +) -> None: + """Dump the ``message`` into ``message_file_path``.""" + import json + + with message_file_path.open("w") as fh: + json.dump(message, fh) diff --git a/src/scicat_path_helpers.py b/src/scicat_path_helpers.py new file mode 100644 index 0000000..f9ada47 --- /dev/null +++ b/src/scicat_path_helpers.py @@ -0,0 +1,15 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2024 Scicatproject contributors (https://github.com/ScicatProject) +import pathlib + +from scicat_configuration import FileHandlingOptions + + +def select_target_directory( + fh_options: FileHandlingOptions, file_path: pathlib.Path +) -> pathlib.Path: + """Select the target directory based on the file path and the options.""" + if fh_options.hdf_structure_output == "SOURCE_FOLDER": + return file_path.parent / pathlib.Path(fh_options.ingestor_files_directory) + else: + return pathlib.Path(fh_options.local_output_directory) diff --git a/tests/test_logging.py b/tests/test_logging.py index 82ea9ba..8e30445 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -2,8 +2,11 @@ import pytest from scicat_configuration import ( + DatasetOptions, + FileHandlingOptions, GraylogOptions, IngesterConfig, + IngestionOptions, RunOptions, kafkaOptions, ) @@ -28,6 +31,10 @@ def scicat_config(tmp_path: pathlib.Path) -> IngesterConfig: ), kafka_options=kafkaOptions(), graylog_options=GraylogOptions(), + ingestion_options=IngestionOptions( + file_handling_options=FileHandlingOptions(), + dataset_options=DatasetOptions(), + ), )