Skip to content

Commit

Permalink
Merge pull request #363 from gaiaresources/BDRSPS-1118-apply-mapping
Browse files Browse the repository at this point in the history
BDRSPS-1118 Refactor `apply_mapping` method by moving it to the base mapper class
  • Loading branch information
Lincoln-GR authored Dec 11, 2024
2 parents 0d22256 + 70dc8ea commit b5cfc38
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 460 deletions.
122 changes: 115 additions & 7 deletions abis_mapping/base/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,133 @@ def apply_validation(
frictionless.Report: Validation report for the data.
"""

@abc.abstractmethod
def apply_mapping(
self,
*,
data: base_types.ReadableType,
dataset_iri: Optional[rdflib.URIRef] = None,
base_iri: Optional[rdflib.Namespace] = None,
chunk_size: int | None,
dataset_iri: rdflib.URIRef | None = None,
base_iri: rdflib.Namespace | None = None,
**kwargs: Any,
) -> Iterator[rdflib.Graph]:
"""Applies Mapping from Raw Data to ABIS conformant RDF.
Args:
data (ReadableType): Readable raw data.
dataset_iri (Optional[rdflib.URIRef]): Optional dataset IRI.
base_iri (Optional[rdflib.Namespace]): Optional mapping base IRI.
**kwargs (Any): Additional keyword arguments.
data: Readable raw data.
chunk_size: Size of chunks to split raw data into. None to disabled chunking.
dataset_iri: Optional dataset IRI.
base_iri: Optional mapping base IRI.
**kwargs: Additional keyword arguments.
Yields:
rdflib.Graph: ABIS Conformant RDF Sub-Graph from Raw Data Chunk.
"""
# Check chunk size
if chunk_size is not None and chunk_size <= 0:
raise ValueError("chunk_size must be greater than zero")

# Construct Schema and extra fields schema
schema = self.extra_fields_schema(
data=data,
full_schema=True,
)
extra_schema = self.extra_fields_schema(
data=data,
full_schema=False,
)

# Construct Resource
resource = frictionless.Resource(
source=data,
format="csv", # TODO -> Hardcoded to csv for now
schema=schema,
encoding="utf-8",
)

# Initialise Graph
graph = utils.rdf.create_graph()
graph_has_rows: bool = False

# Check if Dataset IRI Supplied
if not dataset_iri:
# If not supplied, create example "default" Dataset IRI
dataset_iri = utils.rdf.uri(f"dataset/{self.DATASET_DEFAULT_NAME}", base_iri)

# Add the default dataset
self.add_default_dataset(
uri=dataset_iri,
base_iri=base_iri,
graph=graph,
)

# Add per-chunk mapping for first chunk
self.apply_mapping_chunk(dataset=dataset_iri, graph=graph)

# Open the Resource to allow row streaming
with resource.open() as r:
# Loop through rows
for row_num, row in enumerate(r.row_stream, start=1):
# Map row
self.apply_mapping_row(
row=row,
dataset=dataset_iri,
graph=graph,
extra_schema=extra_schema,
base_iri=base_iri,
**kwargs,
)
graph_has_rows = True

# yield chunk if required
if chunk_size is not None and row_num % chunk_size == 0:
yield graph
# Initialise New Graph for next chunk
graph = utils.rdf.create_graph()
graph_has_rows = False
self.apply_mapping_chunk(dataset=dataset_iri, graph=graph)

# yield final chunk, or whole graph if not chunking.
if graph_has_rows or chunk_size is None:
yield graph

def apply_mapping_chunk(
self,
*,
dataset: rdflib.URIRef,
graph: rdflib.Graph,
) -> None:
"""Applies mapping for RDF that should be present in every chunk.
This method can be extended by subclasses, remember to call super()!
Args:
dataset: The Dataset URI
graph: The graph for the chunk to add the mapping to.
"""
# This should be in every chunk, so the type of the dataset can be resolved.
graph.add((dataset, a, utils.namespaces.TERN.Dataset))

@abc.abstractmethod
def apply_mapping_row(
self,
*,
row: frictionless.Row,
dataset: rdflib.URIRef,
graph: rdflib.Graph,
extra_schema: frictionless.Schema,
base_iri: rdflib.Namespace | None,
**kwargs: Any,
) -> None:
"""Applies Mapping for a Row in the template by mutating the passed Graph.
Args:
row: Row from the template to be processed.
dataset: Dataset URI.
graph: Graph to map row into.
extra_schema: Template schema including any extra fields.
base_iri: Optional base IRI namespace to use for mapping.
kwargs: Additional keyword arguments.
"""

def add_default_dataset(
self,
Expand Down
101 changes: 5 additions & 96 deletions abis_mapping/templates/incidental_occurrence_data_v3/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from abis_mapping import vocabs

# Typing
from typing import Iterator, Optional, Any
from typing import Any


# Constants and Shortcuts
Expand Down Expand Up @@ -141,104 +141,16 @@ def apply_validation(
# Return Validation Report
return report

def apply_mapping(
self,
data: base.types.ReadableType,
dataset_iri: Optional[rdflib.URIRef] = None,
base_iri: Optional[rdflib.Namespace] = None,
**kwargs: Any,
) -> Iterator[rdflib.Graph]:
"""Applies Mapping for the `incidental_occurrence_data.csv` Template
Args:
data (base.types.ReadableType): Valid raw data to be mapped.
dataset_iri (Optional[rdflib.URIRef]): Optional dataset IRI.
base_iri (Optional[rdflib.Namespace]): Optional mapping base IRI.
Keyword Args:
chunk_size (Optional[int]): How many rows of the original data to
ingest before yielding a graph. `None` will ingest all rows.
Yields:
rdflib.Graph: ABIS Conformant RDF Sub-Graph from Raw Data Chunk.
"""
# Extract keyword arguments
chunk_size = kwargs.get("chunk_size")
if not isinstance(chunk_size, int) or chunk_size < 1:
chunk_size = None

# Construct Schema
schema = self.extra_fields_schema(
data=data,
full_schema=True,
)
extra_schema = self.extra_fields_schema(
data=data,
full_schema=False,
)

# Construct Resource
resource = frictionless.Resource(
source=data,
format="csv", # TODO -> Hardcoded to csv for now
schema=schema,
encoding="utf-8",
)

# Initialise Graph
graph = utils.rdf.create_graph()

# Check if Dataset IRI Supplied
if dataset_iri:
# If supplied, add just the dataset type.
graph.add((dataset_iri, a, utils.namespaces.TERN.Dataset))
else:
# If not supplied, create example "default" Dataset IRI
dataset_iri = utils.rdf.uri(f"dataset/{self.DATASET_DEFAULT_NAME}", base_iri)

# Add Example Default Dataset if not Supplied
self.add_default_dataset(
uri=dataset_iri,
base_iri=base_iri,
graph=graph,
)

# Open the Resource to allow row streaming
with resource.open() as r:
# Loop through Rows
for row in r.row_stream:
# Map Row
self.apply_mapping_row(
row=row,
dataset=dataset_iri,
graph=graph,
extra_schema=extra_schema,
base_iri=base_iri,
)

# Check Whether to Yield a Chunk
# The row_number needs to be reduced by one as the numbering of rows
# in a Resource includes the header
if chunk_size is not None and (row.row_number - 1) % chunk_size == 0:
# Yield Chunk
yield graph

# Initialise New Graph
graph = utils.rdf.create_graph()
# Every chunk should have this node
graph.add((dataset_iri, a, utils.namespaces.TERN.Dataset))

# Yield
yield graph

def apply_mapping_row(
self,
*,
row: frictionless.Row,
dataset: rdflib.URIRef,
graph: rdflib.Graph,
extra_schema: frictionless.Schema,
base_iri: Optional[rdflib.Namespace] = None,
) -> rdflib.Graph:
base_iri: rdflib.Namespace | None,
**kwargs: Any,
) -> None:
"""Applies Mapping for a Row in the `incidental_occurrence_data.csv` Template
Args:
Expand Down Expand Up @@ -1166,9 +1078,6 @@ def apply_mapping_row(
extra_schema=extra_schema,
)

# Return
return graph

def add_provider_identified(
self,
uri: rdflib.URIRef,
Expand Down
75 changes: 4 additions & 71 deletions abis_mapping/templates/survey_metadata_v2/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from abis_mapping import utils

# Typing
from typing import Optional, Iterator, Any
from typing import Any


# Constants / shortcuts
Expand Down Expand Up @@ -106,82 +106,15 @@ def apply_validation(self, data: base.types.ReadableType, **kwargs: Any) -> fric
# Return validation report
return report

def apply_mapping(
self,
data: base.types.ReadableType,
dataset_iri: Optional[rdflib.URIRef] = None,
base_iri: Optional[rdflib.Namespace] = None,
**kwargs: Any,
) -> Iterator[rdflib.Graph]:
"""Applies mapping for the `survey_metadata.csv` template.
Args:
data (base.types.ReadableType): Valid raw data to be mapped.
dataset_iri (Optional[rdflib.URIRef]): Optional dataset IRI.
base_iri (Optional[rdflib.Namespace]): Optional mapping base IRI.
**kwargs (Any): Additional keyword arguments.
Yields:
rdflib.Graph: ABIS conformant RDF sub-graph from raw data chunk.
"""
# Construct Schema
schema = self.extra_fields_schema(
data=data,
full_schema=True,
)
extra_schema = self.extra_fields_schema(
data=data,
full_schema=False,
)

# Construct Resource
resource = frictionless.Resource(
source=data,
format="csv", # TODO -> Hardcoded to csv for now
schema=schema,
encoding="utf-8",
)

# Initialise Graph
graph = utils.rdf.create_graph()

# Check if Dataset IRI Supplied
if dataset_iri:
# If supplied, add just the dataset type.
graph.add((dataset_iri, a, utils.namespaces.TERN.Dataset))
else:
# If not supplied, create example "default" Dataset IRI
dataset_iri = utils.rdf.uri(f"dataset/{self.DATASET_DEFAULT_NAME}", base_iri)

# Add the default dataset
self.add_default_dataset(
uri=dataset_iri,
base_iri=base_iri,
graph=graph,
)

# Open the Resource to allow row streaming
with resource.open() as r:
# Loop through rows
for row in r.row_stream:
# Map row
self.apply_mapping_row(
row=row,
dataset=dataset_iri,
graph=graph,
extra_schema=extra_schema,
base_iri=base_iri,
)

yield graph

def apply_mapping_row(
self,
*,
row: frictionless.Row,
dataset: rdflib.URIRef,
graph: rdflib.Graph,
extra_schema: frictionless.Schema,
base_iri: Optional[rdflib.Namespace] = None,
base_iri: rdflib.Namespace | None,
**kwargs: Any,
) -> None:
"""Applies mapping for a row in the `survey_metadata.csv` template.
Expand Down
Loading

0 comments on commit b5cfc38

Please sign in to comment.