Skip to content

Commit

Permalink
Merge branch 'main' into fix_from_testing_20241031
Browse files Browse the repository at this point in the history
  • Loading branch information
YooSunYoung authored Nov 1, 2024
2 parents dda7237 + 344362e commit 1474c60
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 62 deletions.
1 change: 1 addition & 0 deletions resources/base.imsc.json.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"order": 1,
"id": "c5bed39a-4379-11ef-ba5a-ffbc783163b6",
"name" : "Generic metadata schema",
"instrument" : "",
Expand Down
1 change: 1 addition & 0 deletions resources/coda.imsc.json.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"order": 1,
"id" : "715ce7ba-3f91-11ef-932f-37a5c6fd60b1",
"name" : "Coda Metadata Schema",
"instrument": "coda",
Expand Down
9 changes: 6 additions & 3 deletions resources/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"config_file": "",
"id": "",
"dataset": {
"check_by_job_id": true,
"allow_dataset_pid": true,
"generate_dataset_pid": false,
"dataset_pid_prefix": "20.500.12269",
Expand All @@ -17,6 +16,9 @@
"dry_run": false,
"offline_ingestor_executable": "background_ingestor",
"schemas_directory": "schemas",
"check_if_dataset_exists_by_pid": true,
"check_if_dataset_exists_by_metadata": true,
"check_if_dataset_exists_by_metadata_key": "job_id",
"file_handling": {
"compute_file_stats": true,
"compute_file_hash": true,
Expand All @@ -25,7 +27,8 @@
"hash_file_extension": "b2b",
"ingestor_files_directory": "../ingestor",
"message_to_file": true,
"message_file_extension": "message.json"
"message_file_extension": "message.json",
"use_full_file_path": false
}
},
"kafka": {
Expand Down Expand Up @@ -57,7 +60,7 @@
"scicat": {
"host": "https://scicat.host",
"token": "JWT_TOKEN",
"headers": {},
"additional_headers": {},
"timeout": 0,
"stream": true,
"verify": false,
Expand Down
1 change: 1 addition & 0 deletions resources/dream.imsc.json.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"order": 1,
"id" : "72a991ee-437a-11ef-8fd2-1f95660accb7",
"name" : "dream Metadata Schema",
"instrument": "dream",
Expand Down
1 change: 1 addition & 0 deletions resources/loki.imsc.json.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"order": 1,
"id" : "891322f6-437a-11ef-980a-7bdc756bd0b3",
"name" : "Loki Metadata Schema",
"instrument": "loki",
Expand Down
96 changes: 86 additions & 10 deletions src/scicat_communication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
import json
import logging
from dataclasses import asdict
from typing import Any
from urllib.parse import quote, urljoin

import requests
from scicat_configuration import SciCatOptions
Expand All @@ -14,7 +18,7 @@ def retrieve_value_from_scicat(
field_name: str, # variable_recipe["field"]
) -> str:
response: dict = requests.get(
scicat_endpoint_url, headers=config.headers, timeout=config.timeout
scicat_endpoint_url, headers=config.additional_headers, timeout=config.timeout
).json()
return response[field_name] if field_name else response

Expand All @@ -23,6 +27,12 @@ class ScicatDatasetAPIError(Exception):
pass


def _get_from_scicat(
*, url: str, headers: dict, timeout: int, stream: bool, verify: bool
) -> requests.Response:
return requests.get(url, headers=headers, timeout=timeout)


def _post_to_scicat(*, url: str, posting_obj: dict, headers: dict, timeout: int):
return requests.request(
method="POST",
Expand All @@ -45,7 +55,7 @@ def create_scicat_dataset(
response = _post_to_scicat(
url=config.urls.datasets,
posting_obj=dataset,
headers=config.headers,
headers=config.additional_headers,
timeout=config.timeout,
)
result: dict = response.json()
Expand Down Expand Up @@ -77,7 +87,7 @@ def create_scicat_origdatablock(
response = _post_to_scicat(
url=config.urls.origdatablocks,
posting_obj=origdatablock,
headers=config.headers,
headers=config.additional_headers,
timeout=config.timeout,
)
result: dict = response.json()
Expand All @@ -98,13 +108,79 @@ def create_scicat_origdatablock(
return result


def render_full_url(
url: str,
config: SciCatOptions,
) -> str:
def render_full_url(url: str, config: SciCatOptions) -> str:
urls = asdict(config.urls)
if not url.startswith("http://") and not url.startswith("https://"):
for endpoint in config.urls.__dict__.keys():
for endpoint in urls.keys():
if url.startswith(endpoint):
url = url.replace(endpoint, config.urls.__getattribute__(endpoint))
break
return url.replace(endpoint, urls[endpoint])
return url


def check_dataset_by_pid(
pid: str, config: SciCatOptions, logger: logging.Logger
) -> bool:
response = _get_from_scicat(
url=urljoin(config.host_address, f"datasets/{quote(pid)}"),
headers=config.additional_headers,
timeout=config.timeout,
stream=config.stream,
verify=config.verify,
)
dataset_exists: bool
if not response.ok:
logger.error(
"Failed to check dataset existence by pid with status code: %s. "
"Error message from scicat backend: \n%s\n"
"Assuming the dataset does not exist.",
response.status_code,
response.reason,
)
dataset_exists = False
elif response.json():
logger.info("Dataset with pid %s exists.", pid)
dataset_exists = True
else:
logger.info("Dataset with pid %s does not exist.", pid)
dataset_exists = False

return dataset_exists


def check_dataset_by_metadata(
metadata_key: str,
metadata_value: Any,
config: SciCatOptions,
logger: logging.Logger,
) -> bool:
metadata_dict = {f"scientificMetadata.{metadata_key}.value": metadata_value}
filter_string = '?filter={"where":' + json.dumps(metadata_dict) + "}"
url = urljoin(config.host_address, "datasets") + filter_string
logger.info("Checking if dataset exists by metadata with url: %s", url)
response = _get_from_scicat(
url=url,
headers=config.additional_headers,
timeout=config.timeout,
stream=config.stream,
verify=config.verify,
)
dataset_exists: bool
if not response.ok:
logger.error(
"Failed to check dataset existence by metadata key %s with status code: %s "
"Error message from scicat backend: \n%s\n"
"Assuming the dataset does not exist.",
metadata_key,
response.status_code,
response.reason,
)
dataset_exists = False
elif response.json():
logger.info("Retrieved %s dataset(s) from SciCat", len(response.json()))
logger.info("Dataset with metadata %s exists.", metadata_dict)
dataset_exists = True
else:
logger.info("Dataset with metadata %s does not exist.", metadata_dict)
dataset_exists = False

return dataset_exists
35 changes: 20 additions & 15 deletions src/scicat_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ class IngestionOptions:
dry_run: bool = False
offline_ingestor_executable: str = "background_ingestor"
schemas_directory: str = "schemas"
check_if_dataset_exists_by_pid: bool = True
check_if_dataset_exists_by_metadata: bool = True
check_if_dataset_exists_by_metadata_key: str = "job_id"
file_handling: FileHandlingOptions = field(default_factory=FileHandlingOptions)


Expand All @@ -221,7 +224,6 @@ def default_access_groups() -> list[str]:

@dataclass(kw_only=True)
class DatasetOptions:
check_by_job_id: bool = True
allow_dataset_pid: bool = True
generate_dataset_pid: bool = False
dataset_pid_prefix: str = "20.500.12269"
Expand Down Expand Up @@ -251,7 +253,7 @@ class ScicatEndpoints:
class SciCatOptions:
host: str = "https://scicat.host"
token: str = "JWT_TOKEN"
headers: dict = field(default_factory=dict)
additional_headers: dict = field(default_factory=dict)
timeout: int = 0
stream: bool = True
verify: bool = False
Expand All @@ -260,22 +262,25 @@ class SciCatOptions:
@property
def urls(self) -> _ScicatAPIURLs:
return _ScicatAPIURLs(
datasets=urljoin(self.host, self.api_endpoints.datasets),
proposals=urljoin(self.host, self.api_endpoints.proposals),
origdatablocks=urljoin(self.host, self.api_endpoints.origdatablocks),
instruments=urljoin(self.host, self.api_endpoints.instruments),
datasets=urljoin(self.host_address, self.api_endpoints.datasets),
proposals=urljoin(self.host_address, self.api_endpoints.proposals),
origdatablocks=urljoin(
self.host_address, self.api_endpoints.origdatablocks
),
instruments=urljoin(self.host_address, self.api_endpoints.instruments),
)

@classmethod
def from_configurations(cls, config: dict) -> "SciCatOptions":
"""Create SciCatOptions from a dictionary."""
options = cls(**config)
options.host = options.host.removesuffix('/') + "/"
options.headers = {
**options.headers,
**{"Authorization": f"Bearer {options.token}"},
@property
def headers(self) -> dict:
return {
**self.additional_headers,
**{"Authorization": f"Bearer {self.token}"},
}
return options

@property
def host_address(self) -> str:
"""Return the host address ready to be used."""
return self.host.removesuffix('/') + "/"


@dataclass(kw_only=True)
Expand Down
28 changes: 26 additions & 2 deletions src/scicat_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ def to_date(value: Any) -> str | None:


def to_dict(value: Any) -> dict:
if isinstance(value, str):
result = ast.literal_eval(value)
if isinstance(result, dict):
return result
else:
raise ValueError(
"Invalid value. Must be able to convert to a dictionary. Got ", value
)

return dict(value)


Expand Down Expand Up @@ -93,10 +102,20 @@ def convert_to_type(input_value: Any, dtype_desc: str) -> Any:
"join_with_space": lambda value: ", ".join(
ast.literal_eval(value) if isinstance(value, str) else value
),
"evaluate": lambda value: eval(value),
# "evaluate": lambda value: ast.literal_eval(value),
# We are not adding the evaluate function here since
# ``evaluate`` function should be avoided if possible.
# It might seem easy to use, but it is very easy to break
# when the input is not as expected.
# It is better to use the specific converters for the types.
# However, if it is the only way to go, you can add it here.
# Please add a comment to explain why it is needed.
"filename": lambda value: os.path.basename(value),
"dirname": lambda value: os.path.dirname(value),
"dirname-2": lambda value: os.path.dirname(os.path.dirname(value)),
"getitem": lambda value, key: value[
key
], # The only operator that takes an argument
}
)

Expand Down Expand Up @@ -155,7 +174,12 @@ def extract_variables_values(
if isinstance(value, str)
else value
)
value = _get_operator(variable_recipe.operator)(value)
_operator = _get_operator(variable_recipe.operator)
if variable_recipe.field:
value = _operator(value, variable_recipe.field)
else:
value = _operator(value)

else:
raise Exception("Invalid variable source: ", source)
variable_map[variable_name] = convert_to_type(value, variable_recipe.value_type)
Expand Down
12 changes: 11 additions & 1 deletion src/scicat_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class ValueMetadataVariable(MetadataSchemaVariable):

operator: str = ""
value: str
field: str | None = None
# We only allow one field(argument) for now


@dataclass(kw_only=True)
Expand Down Expand Up @@ -139,6 +141,12 @@ def from_file(cls, schema_file_name: pathlib.Path) -> "MetadataSchema":


def render_variable_value(var_value: str, variable_registry: dict) -> str:
# If it is only one variable, then it is a simple replacement
if (var_key := var_value.removesuffix(">").removeprefix("<")) in variable_registry:
return variable_registry[var_key]

# If it is a complex variable, then it is a combination of variables
# similar to f-string in python
for reg_var_name, reg_var_value in variable_registry.items():
var_value = var_value.replace("<" + reg_var_name + ">", str(reg_var_value))

Expand All @@ -159,7 +167,9 @@ def collect_schemas(dir_path: pathlib.Path) -> OrderedDict[str, MetadataSchema]:
MetadataSchema.from_file(schema_file_path)
for schema_file_path in list_schema_file_names(dir_path)
],
key=lambda schema: (schema.order, schema.name),
key=lambda schema: (schema.order, schema.name.capitalize()),
# name is capitalized to make sure that the order is
# alphabetically sorted in a non-case-sensitive way
)
schemas = OrderedDict()
for metadata_schema in metadata_schemas:
Expand Down
Loading

0 comments on commit 1474c60

Please sign in to comment.