Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BDRSPS-1117: Move registries out of the classes they register #367

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion abis_mapping/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Local
from . import plugins # Ensure plugins are loaded
from .base import loader # Dynamically load the template mappers
from .base.mapper import get_mapper, get_mappers
from .base.mapper import register_mapper, get_mapper, registered_ids
54 changes: 26 additions & 28 deletions abis_mapping/base/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@


# Typing
from typing import Any, Iterator, Optional, final
from collections.abc import Iterator, Set
from typing import Any, Final, Optional, final


# Constants
Expand All @@ -31,9 +32,6 @@
class ABISMapper(abc.ABC):
"""ABIS Mapper Base Class"""

# ABIS Mapper Registry
registry: dict[str, type["ABISMapper"]] = {}

# Default Dataset Metadata
DATASET_DEFAULT_NAME = "Example Dataset"
DATASET_DEFAULT_DESCRIPTION = "Example Dataset by Gaia Resources"
Expand Down Expand Up @@ -500,21 +498,6 @@ def fields(cls) -> dict[str, models.schema.Field]:
# Return dictionary of fields
return {f.name: f for f in schema.fields}

@final
@classmethod
def register_mapper(
cls,
mapper: type["ABISMapper"],
) -> None:
"""Registers a concrete ABIS Mapper with the Base Class

Args:
mapper (type[ABISMapper]): Mapper to be registered.
"""
# Register the mapper with its template id
template_id = mapper.metadata()["id"]
cls.registry[template_id] = mapper

@final
def root_dir(self) -> pathlib.Path:
"""Returns the root directory for this Template.
Expand All @@ -529,25 +512,40 @@ def root_dir(self) -> pathlib.Path:
return pathlib.Path(file_path).parent


# Registry for ABIS Mappers.
_registry: Final[dict[str, type[ABISMapper]]] = {}


def register_mapper(mapper: type[ABISMapper]) -> None:
"""Registers a concrete ABIS Mapper

Args:
mapper: Mapper class to be registered.
"""
# Register the mapper with its template id
template_id = mapper.metadata()["id"]
_registry[template_id] = mapper


def get_mapper(template_id: str) -> Optional[type[ABISMapper]]:
"""Retrieves ABIS Mapper class for the specified template ID.

Args:
template_id (str): Template ID to retrieve the mapper for.
template_id: Template ID to retrieve the mapper for.

Returns:
Optional[type[ABISMapper]]: ABIS mapper class associated with the
specified template ID if found, otherwise `None`.
ABIS mapper class associated with the specified template ID if found, otherwise `None`.
"""
# Retrieve and return the mapper
return ABISMapper.registry.get(template_id)
return _registry.get(template_id)


def get_mappers() -> dict[str, type[ABISMapper]]:
"""Retrieves the full registry of ABIS Mappers.
def registered_ids() -> Set[str]:
"""Retrieves a set of the registered ABIS Mappers' template IDs.

Returns:
dict[str, type[ABISMapper]]: Dictionary of template ID to ABIS Mapper.
Set of the template IDs there are registered ABIS Mappers for.
"""
# Retrieve and return the mappers
return ABISMapper.registry
# Return the set of template IDs.
# Do not return the _registry dict itself to reduce chance it is mutated outside this module.
return _registry.keys()
28 changes: 28 additions & 0 deletions abis_mapping/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,34 @@ def get_vocab(self, name: str | None = None) -> Type[utils.vocabs.Vocabulary]:

return utils.vocabs.get_vocab(self.vocabularies[0])

def get_flexible_vocab(
self,
name: str | None = None,
) -> Type[utils.vocabs.FlexibleVocabulary]:
"""Retrieves the flexible vocab for the field.

Args:
name: The name of the vocab to retrieve. Will return first flexible vocab if not provided.

Returns:
Returns FlexibleVocabulary for the field.

Raises:
ValueError: If name is not within the vocabularies field,
or if the field has no flexible vocabs.
"""
all_vocabs = (utils.vocabs.get_vocab(name) for name in self.vocabularies)
try:
return next(
vocab_class
for vocab_class in all_vocabs
if issubclass(vocab_class, utils.vocabs.FlexibleVocabulary)
and (name is None or vocab_class.vocab_id == name)
)
except StopIteration:
name_note = "" if name is None else f" '{name}'"
raise ValueError(f"Flexible vocab{name_note} not found for field {self.name}") from None

@property
def publishable_vocabularies(self) -> list[str]:
"""Returns a list of only those vocabularies that are publishable.
Expand Down
4 changes: 2 additions & 2 deletions abis_mapping/models/spatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def original_datum_uri(self) -> rdflib.URIRef:
vocab = utils.vocabs.get_vocab("GEODETIC_DATUM")
try:
# Init with dummy graph and return corresponding URI
return vocab(graph=rdflib.Graph()).get(self.original_datum_name)
return vocab().get(self.original_datum_name)
except utils.vocabs.VocabularyError as exc:
raise GeometryError(
f"CRS {self.original_datum_name} is " "not defined for the GEODETIC_DATUM fixed vocabulary."
Expand Down Expand Up @@ -144,7 +144,7 @@ def transformer_datum_uri(self) -> rdflib.URIRef:

try:
# Init with dummy graph and return corresponding uri
return vocab(graph=rdflib.Graph()).get(default_crs)
return vocab().get(default_crs)
except utils.vocabs.VocabularyError as exc:
raise GeometryError(
f"Default CRS {default_crs} is not defined for the GEODETIC_DATUM fixed vocabulary."
Expand Down
49 changes: 23 additions & 26 deletions abis_mapping/templates/incidental_occurrence_data_v3/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,6 @@ def apply_mapping_row(
uri=conservation_authority_value,
conservation_authority=conservation_authority,
graph=graph,
base_iri=base_iri,
)

# Add conservation Authority Collection
Expand Down Expand Up @@ -1128,7 +1127,7 @@ def add_observation_scientific_name(
date_identified: models.temporal.Timestamp = row["dateIdentified"] or row["eventDateStart"]

# Retrieve vocab for field
vocab = self.fields()["identificationMethod"].get_vocab()
vocab = self.fields()["identificationMethod"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["identificationMethod"])
Expand Down Expand Up @@ -1197,7 +1196,7 @@ def add_observation_verbatim_id(
foi = sample_specimen if has_specimen(row) else provider_record_id_occurrence

# Retrieve vocab for field
vocab = self.fields()["identificationMethod"].get_vocab()
vocab = self.fields()["identificationMethod"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["identificationMethod"])
Expand Down Expand Up @@ -1412,7 +1411,7 @@ def add_id_qualifier_value(
graph.add((uri, rdflib.RDFS.label, rdflib.Literal(id_qualifier)))

# Retrieve vocab for field
vocab = self.fields()["identificationQualifier"].get_vocab()
vocab = self.fields()["identificationQualifier"].get_flexible_vocab()
# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(id_qualifier)
# Identification Qualifier Value
Expand Down Expand Up @@ -1831,7 +1830,7 @@ def add_sample_specimen(
return

# Retrieve vocab for field
vocab = self.fields()["kingdom"].get_vocab("KINGDOM_SPECIMEN")
vocab = self.fields()["kingdom"].get_flexible_vocab("KINGDOM_SPECIMEN")

# Retrieve Vocab or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["kingdom"])
Expand Down Expand Up @@ -2002,7 +2001,7 @@ def add_taxon_rank_value(
graph.add((uri, rdflib.RDFS.label, rdflib.Literal(taxon_rank)))

# Retrieve vocab for field
vocab = self.fields()["taxonRank"].get_vocab()
vocab = self.fields()["taxonRank"].get_flexible_vocab()
# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(taxon_rank)
# Taxon Rank Value
Expand Down Expand Up @@ -2294,7 +2293,7 @@ def add_organism_quantity_value(
return

# Retrieve vocab for field
vocab = self.fields()["organismQuantityType"].get_vocab()
vocab = self.fields()["organismQuantityType"].get_flexible_vocab()

# Get term or create on the fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(organism_qty_type)
Expand Down Expand Up @@ -2366,7 +2365,7 @@ def add_habitat_value(
graph.add((uri, rdflib.RDFS.label, rdflib.Literal(habitat)))

# Retrieve vocab for field
vocab = self.fields()["habitat"].get_vocab()
vocab = self.fields()["habitat"].get_flexible_vocab()
# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(habitat)
# Add value
Expand Down Expand Up @@ -2467,7 +2466,7 @@ def add_basis_value(
graph.add((uri, rdflib.RDFS.label, rdflib.Literal(basis_of_record)))

# Retrieve vocab for field
vocab = self.fields()["basisOfRecord"].get_vocab()
vocab = self.fields()["basisOfRecord"].get_flexible_vocab()
# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(basis_of_record)
# Add value
Expand Down Expand Up @@ -2618,7 +2617,7 @@ def add_occurrence_status_value(
return

# Retrieve vocab for field
vocab = self.fields()["occurrenceStatus"].get_vocab()
vocab = self.fields()["occurrenceStatus"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["occurrenceStatus"])
Expand Down Expand Up @@ -2688,7 +2687,7 @@ def add_preparations_value(
graph.add((uri, rdflib.RDFS.label, rdflib.Literal(preparations)))

# Retrieve vocab for field
vocab = self.fields()["preparations"].get_vocab()
vocab = self.fields()["preparations"].get_flexible_vocab()
# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(preparations)
# Add value
Expand Down Expand Up @@ -2810,7 +2809,7 @@ def add_establishment_means_value(
return

# Retrieve vocab for field
vocab = self.fields()["establishmentMeans"].get_vocab()
vocab = self.fields()["establishmentMeans"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["establishmentMeans"])
Expand Down Expand Up @@ -2904,7 +2903,7 @@ def add_life_stage_value(
return

# Retrieve vocab for field
vocab = self.fields()["lifeStage"].get_vocab()
vocab = self.fields()["lifeStage"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["lifeStage"])
Expand Down Expand Up @@ -2997,7 +2996,7 @@ def add_sex_value(
return

# Retrieve vocab for field
vocab = self.fields()["sex"].get_vocab()
vocab = self.fields()["sex"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["sex"])
Expand Down Expand Up @@ -3091,7 +3090,7 @@ def add_reproductive_condition_value(
return

# Retrieve vocab for field
vocab = self.fields()["reproductiveCondition"].get_vocab()
vocab = self.fields()["reproductiveCondition"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["reproductiveCondition"])
Expand Down Expand Up @@ -3215,7 +3214,7 @@ def add_sampling_sequencing(
)

# Retrieve vocab for field
vocab = self.fields()["sequencingMethod"].get_vocab()
vocab = self.fields()["sequencingMethod"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["sequencingMethod"])
Expand Down Expand Up @@ -3357,7 +3356,7 @@ def add_threat_status_observation(
)

# Retrieve vocab for field
vocab = self.fields()["threatStatusCheckProtocol"].get_vocab()
vocab = self.fields()["threatStatusCheckProtocol"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(row["threatStatusCheckProtocol"])
Expand Down Expand Up @@ -3421,7 +3420,7 @@ def add_threat_status_value(
value = f"{row['conservationAuthority']}/{row['threatStatus']}"

# Retrieve vocab for field
vocab = self.fields()["threatStatus"].get_vocab()
vocab = self.fields()["threatStatus"].get_flexible_vocab()

# Retrieve term or Create on the Fly
term = vocab(graph=graph, source=dataset, base_iri=base_iri).get(value)
Expand Down Expand Up @@ -3469,15 +3468,13 @@ def add_conservation_authority_value(
uri: rdflib.URIRef | None,
conservation_authority: str | None,
graph: rdflib.Graph,
base_iri: rdflib.Namespace | None,
) -> None:
"""Adds Conservation Authority Value to the Graph

Args:
uri: URI to use for this node
conservation_authority: conservationAuthority value from the CSV
graph: Graph to add to
base_iri: Namespace used to construct IRIs)
"""
# Check Existence
if uri is None:
Expand All @@ -3492,8 +3489,8 @@ def add_conservation_authority_value(

# Retrieve vocab for field
vocab = self.fields()["conservationAuthority"].get_vocab()
# Retrieve term or Create on the Fly
term = vocab(graph=graph, base_iri=base_iri).get(conservation_authority)
# Retrieve term
term = vocab().get(conservation_authority)
# Conservation Authority Value
graph.add((uri, rdflib.RDF.value, term))

Expand Down Expand Up @@ -3592,7 +3589,7 @@ def add_sensitivity_category_value(
return

# Retrieve vocab for field
vocab = self.fields()["sensitivityCategory"].get_vocab()
vocab = self.fields()["sensitivityCategory"].get_flexible_vocab()
vocab_instance = vocab(graph=graph, source=dataset, base_iri=base_iri)

# Set the scope note to use if a new term is created on the fly.
Expand Down Expand Up @@ -3734,7 +3731,7 @@ def add_occurrence(
)

# Add feature type from vocab
kingdom_vocab = self.fields()["kingdom"].get_vocab("KINGDOM_OCCURRENCE")
kingdom_vocab = self.fields()["kingdom"].get_flexible_vocab("KINGDOM_OCCURRENCE")
kingdom_term = kingdom_vocab(graph=graph, base_iri=base_iri).get(row["kingdom"])
graph.add((uri, utils.namespaces.TERN.featureType, kingdom_term))

Expand Down Expand Up @@ -3787,7 +3784,7 @@ def add_occurrence(
graph.add((temporal_entity, rdflib.TIME.hasEnd, end_instant))

# Add procedure from vocab
protocol_vocab = self.fields()["samplingProtocol"].get_vocab()
protocol_vocab = self.fields()["samplingProtocol"].get_flexible_vocab()
protocol_term = protocol_vocab(graph=graph, base_iri=base_iri).get(row["samplingProtocol"])
graph.add((uri, rdflib.SOSA.usedProcedure, protocol_term))

Expand Down Expand Up @@ -3860,4 +3857,4 @@ def has_specimen(row: frictionless.Row) -> bool:


# Register mapper
base.mapper.ABISMapper.register_mapper(IncidentalOccurrenceMapper)
base.mapper.register_mapper(IncidentalOccurrenceMapper)
Loading
Loading