Skip to content

Commit

Permalink
Merge pull request #45 from SciCatProject/schema_selector_refactor
Browse files Browse the repository at this point in the history
Schema selector refactor
  • Loading branch information
YooSunYoung authored Jul 16, 2024
2 parents ff22d59 + 9746444 commit e256c53
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 221 deletions.
4 changes: 2 additions & 2 deletions config.20240405.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
"config_file" : "config.json",
"verbose" : false,
"file_log" : false,
"log_filepath_prefix" : ".scicat_ingestor_log",
"file_log_base_name" : ".scicat_ingestor_log",
"file_log_timestamp" : false,
"log_level" : "INFO",
"logging_level" : "INFO",
"system_log" : false,
"system_log_facility" : "mail",
"log_message_prefix" : " SFI: ",
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ requires-python = ">=3.12"
# Run 'tox -e deps' after making changes here. This will update requirement files.
# Make sure to list one dependency per line.
dependencies = [
"kafka-python",
"confluent_kafka",
"ess-streaming-data-types",
"graypy",
"h5py",
"kafka-python",
"requests",
"rich",
"graypy"
"rich"
]

dynamic = ["version"]
Expand Down
100 changes: 52 additions & 48 deletions resources/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,69 +9,73 @@
"ssl_ca_location": "FULL_PATH_TO_CERTIFICATE_FILE",
"individual_message_commit": true,
"enable_auto_commit": true,
"auto_offset_reset": "earliest"
"auto_offset_reset": "earliest",
"message_saving_options": {
"message_to_file": true,
"message_file_extension": "message.json",
"message_output": "SOURCE_FOLDER"
}
},
"user_office": {
"host": "https://useroffice.host",
"username": "USERNAME",
"password": "PASSWORD",
"token" : "JWT_TOKEN"
"token": "JWT_TOKEN"
},
"scicat": {
"host": "https://scicat.host",
"username": "USERNAME",
"password": "PASSWORD",
"token" : "JWT_TOKEN"
},
"graylog": {
"host" : "",
"port" : "",
"facility" : "scicat.ingestor"
"token": "JWT_TOKEN"
},
"graylog": {"host": "", "port": "", "facility": "scicat.ingestor"},
"dataset": {
"instrument_id" : "ID_OF_FALLBACK_INSTRUMENT",
"instrument" : "FALLBACK_INSTRUMENT_NAME",
"default_proposal_id" : "DEFAULT_PROPOSAL_ID",
"ownable" : {
"ownerGroup": "DEFAULT_OWNER_GROUP",
"accessGroups": ["ACCESS_GROUP_1"]
"instrument_id": "ID_OF_FALLBACK_INSTRUMENT",
"instrument": "FALLBACK_INSTRUMENT_NAME",
"default_proposal_id": "DEFAULT_PROPOSAL_ID",
"ownable": {
"ownerGroup": "DEFAULT_OWNER_GROUP",
"accessGroups": ["ACCESS_GROUP_1"]
}
},
"options": {
"config_file" : "config.json",
"schemas_folders" : "schemas",
"verbose" : false,
"file_log" : false,
"file_log_base_name" : "scicat_ingestor_log",
"file_log_timestamp" : false,
"loggin_level" : "INFO",
"system_log" : false,
"system_log_facility" : "mail",
"log_message_prefix" : " SFI ",
"check_by_job_id" : true,
"config_file": "config.json",
"verbose": false,
"file_log": false,
"file_log_base_name": "scicat_ingestor_log",
"file_log_timestamp": false,
"logging_level": "INFO",
"system_log": false,
"system_log_facility": "mail",
"log_message_prefix": "SFI",
"check_by_job_id": true,
"pyscicat": null,
"graylog" : false,
"dry_run" : false,
"retrieve_instrument_from" : "default",
"instrument_position_in_file_path" : 3,
"message_to_file" : true,
"message_file_extension" : ".message.json",
"message_output" : "SOURCE_FOLDER",
"hdf_structure_in_metadata" : false, // not sure if needed
"hdf_structure_to_file" : true, // not sure if needed
"hdf_structure_file_extension" : ".hdf_structure.json", // not sure if needed
"hdf_structure_output" : "SOURCE_FOLDER", // not sure if needed
"local_output_folder" : "./data",
"compute_files_stats" : true,
"compute_files_hash" : true,
"file_hash_algorithm" : "blake2b",
"save_hash_in_file" : true,
"hash_file_extension" : "b2b",
"ingestor_files_folder": "ingestor",
"dataset_pid_prefix" : "20.500.12269",
"force_dataset_pid" : true, // not sure if needed
"use_job_id_as_dataset" : true,
"beautify_metadata_keys" : false,
"metadata_levels_separator": " " // not sure if needed
"graylog": false
},
"ingestion_options": {
"dry_run": false,
"schemas_directory": "schemas",
"retrieve_instrument_from": "default",
"instrument_position_in_file_path": 3,
"file_handling_options": {
"hdf_structure_in_metadata": false,
"hdf_structure_to_file": true,
"hdf_structure_file_extension": ".hdf_structure.json",
"hdf_structure_output": "SOURCE_FOLDER",
"local_output_directory": "data",
"compute_file_stats": true,
"compute_file_hash": true,
"file_hash_algorithm": "blake2b",
"save_file_hash": true,
"hash_file_extension": "b2b",
"ingestor_files_directory": "ingestor"
},
"dataset_options": {
"force_dataset_pid": true,
"dataset_pid_prefix": "20.500.12269",
"use_job_id_as_dataset_id": true,
"beautify_metadata_keys": false,
"metadata_levels_separator": " "
}
}
}
142 changes: 55 additions & 87 deletions src/background_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,27 @@
import datetime
import json
import pathlib
import h5py
import os

import h5py
import requests
from scicat_configuration import (
BackgroundIngestorConfig,
build_background_ingestor_arg_parser,
build_scicat_background_ingester_config, BackgroundIngestorConfig,
build_scicat_background_ingester_config,
)
from scicat_logging import build_logger
from scicat_metadata import collect_schemas, select_applicable_schema
from system_helpers import exit_at_exceptions


def replace_variables_values(url: str, values: dict) -> str:
for key, value in values.items():
url = url.replace("{" + key + "}", str(value))
return url

def list_schema_files(schemas_folder):
"""
return the list of the metadata schema configuration available in the folder provided
valid metadata schema configuration ends with imsc.json
imsc = ingestor metadata schema configuration
"""
return [file for file in os.listdir(schemas_folder) if file.endswith("imsc.json") and not file.startswith(".")]


def select_applicable_schema(nexus_file, nxs, schemas):
"""
This function evaluate which metadata schema configuration is applicable to this file.
Order of the schemas matters and first schema that is suitable is selected.
"""
for schema in schemas.values():
if isinstance(schema['selector'], str):
selector_list = schema['selector'].split(':')
selector = {
"operand_1": selector_list[0],
"operation": selector_list[1],
"operand_2": selector_list[2],
}
elif isinstance(schema['selector'], dict):
selector = schema['selector']
else:
raise Exception("Invalid type for schema selector")

if selector['operand_1'] in [
"filename",
"data_file",
"nexus_file",
"data_file_name",
]:
selector['operand_1_value'] = nexus_file

if selector['operation'] == "starts_with":
if selector['operand_1_value'].startswith(selector['operand_2']):
return schema

raise Exception("No applicable metadata schema configuration found!!")

def extract_variables_values(
variables: dict,
h5file,
config: BackgroundIngestorConfig
variables: dict, h5file, config: BackgroundIngestorConfig
) -> dict:
values = {}

Expand All @@ -75,56 +39,62 @@ def extract_variables_values(
elif source == "SC":
# build url
url = replace_variables_values(
config[""]["scicat_url"] + variables[variable]["url"],
values
config[""]["scicat_url"] + variables[variable]["url"], values
)
# retrieve value from SciCat
response = requests.get(
url,
headers = {
"token": config[""]["token"]
}
headers={"token": config[""]["token"]},
timeout=10, # TODO: decide timeout
)
# extract value
value = response.json()[variables[variable]["field"]]
elif source == "VALUE":
# the value is the one indicated
# there might be some substitution needed
value = replace_variables_values(
variables[variable]["value"],
values
)
if "operator" in variables[variable].keys() and variables[variable]["operator"]:
value = replace_variables_values(variables[variable]["value"], values)
if (
"operator" in variables[variable].keys()
and variables[variable]["operator"]
):
operator = variables[variable]["operator"]
if operator == "join_with_space":
value = ", ".join(value)
else:
raise Exception("Invalid variable source configuration")

value_type = variables[variable]["value_type"]
if value_type == "string":
if value_type == "string":
value = str(value)
elif value_type == "string[]":
value = [str(v) for v in value]
elif value_type == "integer":
value = int(value)
elif value_type == "float":
value = float(value)
elif value_type == "date" and isinstance(value,int):
value = datetime.datetime.fromtimestamp(value).isoformat()
elif value_type == "date" and isinstance(value,str):
elif value_type == "date" and isinstance(value, int):
value = datetime.datetime.fromtimestamp(value, tz=datetime.UTC).isoformat()
elif value_type == "date" and isinstance(value, str):
value = datetime.datetime.fromisoformat(value).isoformat()

values[variable] = value

return values


def prepare_scicat_dataset(schema, values): ...
def create_scicat_dataset(dataset): ...
def create_scicat_origdatablock(
scicat_dataset_pid, nexus_file=None, done_writing_message_file=None
): ...


def main() -> None:
"""Main entry point of the app."""
arg_parser = build_background_ingestor_arg_parser()
arg_namespace = arg_parser.parse_args()
config = build_scicat_background_ingester_config(arg_namespace)
ingestion_options = config.ingestion_options
logger = build_logger(config)

# Log the configuration as dictionary so that it is easier to read from the logs
Expand All @@ -133,39 +103,35 @@ def main() -> None:
)
logger.info(config.to_dict())

# load metadata schema configurations
# list files in schema folders
schemas = {}
for schema_file in list_schema_files():
with open(schema_file, 'r') as fh:
current_schema = json.load(fh)
schemas[current_schema['id']] = current_schema
# Collect all metadata schema configurations
schemas = collect_schemas(ingestion_options.schema_directory)

with exit_at_exceptions(logger, daemon=False):
nexus_file = pathlib.Path(config.single_run_options.nexus_file)
logger.info("Nexus file to be ingested : ")
logger.info(nexus_file)

done_writing_message_file = pathlib.Path(
config.single_run_options.done_writing_message_file
logger.info(
"Nexus file to be ingested : %s",
(nexus_file_path := pathlib.Path(config.single_run_options.nexus_file)),
)
logger.info(
"Done writing message file linked to nexus file : %s",
(
done_writing_message_file := pathlib.Path(
config.single_run_options.done_writing_message_file
)
),
)
logger.info("Done writing message file linked to nexus file : ")
logger.info(done_writing_message_file)

# open and read done writing message input file
done_writing_message = json.load(done_writing_message_file.open())
logger.info(done_writing_message)
logger.info(json.load(done_writing_message_file.open()))

# open nexus file with h5py
h5file = h5py.File(nexus_file)

# load instrument metadata configuration
metadata_schema = select_applicable_schema(nexus_file, h5file, schemas)
with h5py.File(nexus_file_path) as h5file:
# load instrument metadata configuration
metadata_schema = select_applicable_schema(nexus_file_path, h5file, schemas)

# define variables values
variables_values = extract_variables_values(
metadata_schema['variables'], h5file, config
)
# define variables values
variables_values = extract_variables_values(
metadata_schema['variables'], h5file, config
)

# create b2blake hash of all the files

Expand All @@ -175,15 +141,17 @@ def main() -> None:
)

# create dataset in scicat
scicat_dataset_pid = create_Scicat_dataset(scicat_dataset)
scicat_dataset_pid = create_scicat_dataset(scicat_dataset)

# create and populate scicat origdatablock entry
# with files and hashes previously computed

scicat_origdatablock = create_scicat_origdatablock(
scicat_dataset_pid, nexus_file, done_writing_message_file
scicat_dataset_pid, nexus_file_path, done_writing_message_file
)

# create origdatablock in scicat
scicat_origdatablock_id = create_scicat_origdatablock(scicat_origdatablock)

# return successful code
return scicat_origdatablock_id
Loading

0 comments on commit e256c53

Please sign in to comment.