Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema selector refactor #45

Merged
merged 8 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config.20240405.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
"config_file" : "config.json",
"verbose" : false,
"file_log" : false,
"log_filepath_prefix" : ".scicat_ingestor_log",
"file_log_base_name" : ".scicat_ingestor_log",
"file_log_timestamp" : false,
"log_level" : "INFO",
"logging_level" : "INFO",
"system_log" : false,
"system_log_facility" : "mail",
"log_message_prefix" : " SFI: ",
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ requires-python = ">=3.12"
# Run 'tox -e deps' after making changes here. This will update requirement files.
# Make sure to list one dependency per line.
dependencies = [
"kafka-python",
"confluent_kafka",
"ess-streaming-data-types",
"graypy",
"h5py",
nitrosx marked this conversation as resolved.
Show resolved Hide resolved
"kafka-python",
"requests",
"rich",
"graypy"
"rich"
]

dynamic = ["version"]
Expand Down
98 changes: 51 additions & 47 deletions resources/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,67 @@
"host": "https://useroffice.host",
"username": "USERNAME",
"password": "PASSWORD",
"token" : "JWT_TOKEN"
"token": "JWT_TOKEN"
},
"scicat": {
"host": "https://scicat.host",
"username": "USERNAME",
"password": "PASSWORD",
"token" : "JWT_TOKEN"
},
"graylog": {
"host" : "",
"port" : "",
"facility" : "scicat.ingestor"
"token": "JWT_TOKEN"
},
"graylog": {"host": "", "port": "", "facility": "scicat.ingestor"},
"dataset": {
"instrument_id" : "ID_OF_FALLBACK_INSTRUMENT",
"instrument" : "FALLBACK_INSTRUMENT_NAME",
"default_proposal_id" : "DEFAULT_PROPOSAL_ID",
"ownable" : {
"ownerGroup": "DEFAULT_OWNER_GROUP",
"accessGroups": ["ACCESS_GROUP_1"]
"instrument_id": "ID_OF_FALLBACK_INSTRUMENT",
"instrument": "FALLBACK_INSTRUMENT_NAME",
"default_proposal_id": "DEFAULT_PROPOSAL_ID",
"ownable": {
"ownerGroup": "DEFAULT_OWNER_GROUP",
"accessGroups": ["ACCESS_GROUP_1"]
}
},
"options": {
"config_file" : "config.json",
"schemas_folders" : "schemas",
"verbose" : false,
"file_log" : false,
"file_log_base_name" : "scicat_ingestor_log",
"file_log_timestamp" : false,
"loggin_level" : "INFO",
"system_log" : false,
"system_log_facility" : "mail",
"log_message_prefix" : " SFI ",
"check_by_job_id" : true,
"config_file": "config.json",
"verbose": false,
"file_log": false,
"file_log_base_name": "scicat_ingestor_log",
"file_log_timestamp": false,
"logging_level": "INFO",
"system_log": false,
"system_log_facility": "mail",
"log_message_prefix": "SFI",
"check_by_job_id": true,
"pyscicat": null,
"graylog" : false,
"dry_run" : false,
"retrieve_instrument_from" : "default",
"instrument_position_in_file_path" : 3,
"message_to_file" : true,
"message_file_extension" : ".message.json",
"message_output" : "SOURCE_FOLDER",
"hdf_structure_in_metadata" : false, // not sure if needed
"hdf_structure_to_file" : true, // not sure if needed
"hdf_structure_file_extension" : ".hdf_structure.json", // not sure if needed
"hdf_structure_output" : "SOURCE_FOLDER", // not sure if needed
"local_output_folder" : "./data",
"compute_files_stats" : true,
"compute_files_hash" : true,
"file_hash_algorithm" : "blake2b",
"save_hash_in_file" : true,
"hash_file_extension" : "b2b",
"ingestor_files_folder": "ingestor",
"dataset_pid_prefix" : "20.500.12269",
"force_dataset_pid" : true, // not sure if needed
"use_job_id_as_dataset" : true,
"beautify_metadata_keys" : false,
"metadata_levels_separator": " " // not sure if needed
"graylog": false
},
"ingestion_options": {
"dry_run": false,
"schemas_directory": "schemas",
"retrieve_instrument_from": "default",
"instrument_position_in_file_path": 3,
"message_saving_options": {
"message_to_file": true,
"message_file_extension": "message.json",
"message_output": "SOURCE_FOLDER"
},
"file_handling_options": {
"hdf_structure_in_metadata": false,
"hdf_structure_to_file": true,
"hdf_structure_file_extension": ".hdf_structure.json",
"hdf_structure_output": "SOURCE_FOLDER",
"local_output_directory": "data",
"compute_file_stats": true,
"compute_file_hash": true,
"file_hash_algorithm": "blake2b",
"save_file_hash": true,
"hash_file_extension": "b2b",
"ingestor_files_directory": "ingestor"
},
"dataset_options": {
"force_dataset_pid": true,
"dataset_pid_prefix": "20.500.12269",
"use_job_id_as_dataset_id": true,
"beautify_metadata_keys": false,
"metadata_levels_separator": " "
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I separated these to make it easier to find/use/update the options.

}
}
142 changes: 55 additions & 87 deletions src/background_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,27 @@
import datetime
import json
import pathlib
import h5py
import os

import h5py
import requests
from scicat_configuration import (
BackgroundIngestorConfig,
build_background_ingestor_arg_parser,
build_scicat_background_ingester_config, BackgroundIngestorConfig,
build_scicat_background_ingester_config,
)
from scicat_logging import build_logger
from scicat_metadata import collect_schemas, select_applicable_schema
from system_helpers import exit_at_exceptions


def replace_variables_values(url: str, values: dict) -> str:
for key, value in values.items():
url = url.replace("{" + key + "}", str(value))
return url

def list_schema_files(schemas_folder):
"""
return the list of the metadata schema configuration available in the folder provided
valid metadata schema configuration ends with imsc.json
imsc = ingestor metadata schema configuration
"""
return [file for file in os.listdir(schemas_folder) if file.endswith("imsc.json") and not file.startswith(".")]


def select_applicable_schema(nexus_file, nxs, schemas):
"""
This function evaluate which metadata schema configuration is applicable to this file.
Order of the schemas matters and first schema that is suitable is selected.
"""
for schema in schemas.values():
if isinstance(schema['selector'], str):
selector_list = schema['selector'].split(':')
selector = {
"operand_1": selector_list[0],
"operation": selector_list[1],
"operand_2": selector_list[2],
}
elif isinstance(schema['selector'], dict):
selector = schema['selector']
else:
raise Exception("Invalid type for schema selector")

if selector['operand_1'] in [
"filename",
"data_file",
"nexus_file",
"data_file_name",
]:
selector['operand_1_value'] = nexus_file

if selector['operation'] == "starts_with":
if selector['operand_1_value'].startswith(selector['operand_2']):
return schema

raise Exception("No applicable metadata schema configuration found!!")

def extract_variables_values(
variables: dict,
h5file,
config: BackgroundIngestorConfig
variables: dict, h5file, config: BackgroundIngestorConfig
) -> dict:
values = {}

Expand All @@ -75,56 +39,62 @@ def extract_variables_values(
elif source == "SC":
# build url
url = replace_variables_values(
config[""]["scicat_url"] + variables[variable]["url"],
values
config[""]["scicat_url"] + variables[variable]["url"], values
)
# retrieve value from SciCat
response = requests.get(
url,
headers = {
"token": config[""]["token"]
}
headers={"token": config[""]["token"]},
timeout=10, # TODO: decide timeout
)
# extract value
value = response.json()[variables[variable]["field"]]
elif source == "VALUE":
# the value is the one indicated
# there might be some substitution needed
value = replace_variables_values(
variables[variable]["value"],
values
)
if "operator" in variables[variable].keys() and variables[variable]["operator"]:
value = replace_variables_values(variables[variable]["value"], values)
if (
"operator" in variables[variable].keys()
and variables[variable]["operator"]
):
operator = variables[variable]["operator"]
if operator == "join_with_space":
value = ", ".join(value)
else:
raise Exception("Invalid variable source configuration")

value_type = variables[variable]["value_type"]
if value_type == "string":
if value_type == "string":
value = str(value)
elif value_type == "string[]":
value = [str(v) for v in value]
elif value_type == "integer":
value = int(value)
elif value_type == "float":
value = float(value)
elif value_type == "date" and isinstance(value,int):
value = datetime.datetime.fromtimestamp(value).isoformat()
elif value_type == "date" and isinstance(value,str):
elif value_type == "date" and isinstance(value, int):
value = datetime.datetime.fromtimestamp(value, tz=datetime.UTC).isoformat()
elif value_type == "date" and isinstance(value, str):
value = datetime.datetime.fromisoformat(value).isoformat()

values[variable] = value

return values


def prepare_scicat_dataset(schema, values): ...
def create_scicat_dataset(dataset): ...
def create_scicat_origdatablock(
scicat_dataset_pid, nexus_file=None, done_writing_message_file=None
): ...


def main() -> None:
"""Main entry point of the app."""
arg_parser = build_background_ingestor_arg_parser()
arg_namespace = arg_parser.parse_args()
config = build_scicat_background_ingester_config(arg_namespace)
ingestion_options = config.ingestion_options
logger = build_logger(config)

# Log the configuration as dictionary so that it is easier to read from the logs
Expand All @@ -133,39 +103,35 @@ def main() -> None:
)
logger.info(config.to_dict())

# load metadata schema configurations
# list files in schema folders
schemas = {}
for schema_file in list_schema_files():
with open(schema_file, 'r') as fh:
current_schema = json.load(fh)
schemas[current_schema['id']] = current_schema
# Collect all metadata schema configurations
schemas = collect_schemas(ingestion_options.schema_directory)

with exit_at_exceptions(logger, daemon=False):
nexus_file = pathlib.Path(config.single_run_options.nexus_file)
logger.info("Nexus file to be ingested : ")
logger.info(nexus_file)

done_writing_message_file = pathlib.Path(
config.single_run_options.done_writing_message_file
logger.info(
"Nexus file to be ingested : %s",
(nexus_file_path := pathlib.Path(config.single_run_options.nexus_file)),
)
logger.info(
"Done writing message file linked to nexus file : %s",
(
done_writing_message_file := pathlib.Path(
config.single_run_options.done_writing_message_file
)
),
)
logger.info("Done writing message file linked to nexus file : ")
logger.info(done_writing_message_file)

# open and read done writing message input file
done_writing_message = json.load(done_writing_message_file.open())
logger.info(done_writing_message)
logger.info(json.load(done_writing_message_file.open()))

# open nexus file with h5py
h5file = h5py.File(nexus_file)

# load instrument metadata configuration
metadata_schema = select_applicable_schema(nexus_file, h5file, schemas)
with h5py.File(nexus_file_path) as h5file:
# load instrument metadata configuration
metadata_schema = select_applicable_schema(nexus_file_path, h5file, schemas)

# define variables values
variables_values = extract_variables_values(
metadata_schema['variables'], h5file, config
)
# define variables values
variables_values = extract_variables_values(
metadata_schema['variables'], h5file, config
)

# create b2blake hash of all the files

Expand All @@ -175,15 +141,17 @@ def main() -> None:
)

# create dataset in scicat
scicat_dataset_pid = create_Scicat_dataset(scicat_dataset)
scicat_dataset_pid = create_scicat_dataset(scicat_dataset)

# create and populate scicat origdatablock entry
# with files and hashes previously computed

scicat_origdatablock = create_scicat_origdatablock(
scicat_dataset_pid, nexus_file, done_writing_message_file
scicat_dataset_pid, nexus_file_path, done_writing_message_file
)

# create origdatablock in scicat
scicat_origdatablock_id = create_scicat_origdatablock(scicat_origdatablock)

# return successful code
return scicat_origdatablock_id
Loading
Loading