Skip to content

Commit

Permalink
Merge pull request #4 from pressler-vsc/file-object-io
Browse files Browse the repository at this point in the history
Standardize I/O interfaces around file objects
  • Loading branch information
pressler-vsc authored Dec 23, 2024
2 parents 9f06f60 + 933aaee commit ff9d7ae
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 200 deletions.
86 changes: 34 additions & 52 deletions sarkit/standards/cphd/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@
import importlib.resources
import logging
import os
from typing import BinaryIO, cast

import lxml.etree
import numpy as np
import numpy.typing as npt

import sarkit.standards.general.utils

SCHEMA_DIR = importlib.resources.files("sarkit.standards.cphd.schemas")
CPHD_SECTION_TERMINATOR = b"\f\n"
DEFINED_HEADER_KEYS = {
Expand Down Expand Up @@ -285,12 +282,12 @@ class CphdReader:
Parameters
----------
file : file-like or path-like
file : `file object`
CPHD file to read
Examples
--------
>>> with cphd_io.CphdReader(file) as reader:
>>> with cphd_path.open('rb') as file, CphdReader(file) as reader:
... cphd_xmltree = reader.cphd_xmltree
... signal, pvp = reader.read_channel(<chan_id>)
Expand All @@ -302,29 +299,23 @@ class CphdReader:
CPHD XML ElementTree
plan : :py:class:`CphdPlan`
A CphdPlan object suitable for use in a CphdWriter
xml_block_size
xml_block_byte_offset
pvp_block_size
pvp_block_byte_offset
signal_block_size
signal_block_byte_offset
support_block_size
support_block_byte_offset
xml_block_size : int
xml_block_byte_offset : int
pvp_block_size : int
pvp_block_byte_offset : int
signal_block_size : int
signal_block_byte_offset : int
support_block_size : int or None
support_block_byte_offset : int or None
See Also
--------
CphdPlan
CphdWriter
"""

def __init__(self, file: BinaryIO | str | os.PathLike):
if sarkit.standards.general.utils.is_file_like(file):
self._file_owned = False
self._file_object = file
else:
file = cast(str | os.PathLike, file)
self._file_owned = True
self._file_object = open(file, "rb")
def __init__(self, file):
self._file_object = file

# skip the version line and read header
_, self._kvp_list = read_file_header(self._file_object)
Expand All @@ -347,45 +338,45 @@ def __init__(self, file: BinaryIO | str | os.PathLike):
)

@property
def xml_block_byte_offset(self):
def xml_block_byte_offset(self) -> int:
"""Offset to the XML block"""
return int(self._kvp_list["XML_BLOCK_BYTE_OFFSET"])

@property
def xml_block_size(self):
def xml_block_size(self) -> int:
"""Size of the XML block"""
return int(self._kvp_list["XML_BLOCK_SIZE"])

@property
def pvp_block_byte_offset(self):
def pvp_block_byte_offset(self) -> int:
"""Offset to the PVP block"""
return int(self._kvp_list["PVP_BLOCK_BYTE_OFFSET"])

@property
def pvp_block_size(self):
def pvp_block_size(self) -> int:
"""Size of the PVP block"""
return int(self._kvp_list["PVP_BLOCK_SIZE"])

@property
def signal_block_byte_offset(self):
def signal_block_byte_offset(self) -> int:
"""Offset to the Signal block"""
return int(self._kvp_list["SIGNAL_BLOCK_BYTE_OFFSET"])

@property
def signal_block_size(self):
def signal_block_size(self) -> int:
"""Size of the Signal block"""
return int(self._kvp_list["SIGNAL_BLOCK_SIZE"])

@property
def support_block_byte_offset(self):
def support_block_byte_offset(self) -> int | None:
"""Offset to the Support block"""
if "SUPPORT_BLOCK_BYTE_OFFSET" in self._kvp_list:
return int(self._kvp_list["SUPPORT_BLOCK_BYTE_OFFSET"])
else:
return None

@property
def support_block_size(self):
def support_block_size(self) -> int | None:
"""Size of the Support block"""
if "SUPPORT_BLOCK_SIZE" in self._kvp_list:
return int(self._kvp_list["SUPPORT_BLOCK_SIZE"])
Expand Down Expand Up @@ -490,16 +481,15 @@ def read_support_array(self, sa_identifier):
shape
)

def close(self):
"""Close any files opened by the reader"""
if self._file_owned:
self._file_object.close()
def done(self):
"Indicates to the reader that the user is done with it"
self._file_object = None

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()
self.done()


class CphdWriter:
Expand All @@ -509,7 +499,7 @@ class CphdWriter:
Parameters
----------
file : file-like or path-like
file : `file object`
CPHD file to write
plan : :py:class:`CphdPlan`
A CphdPlan object
Expand All @@ -520,7 +510,7 @@ class CphdWriter:
Examples
--------
>>> with cphd_io.CphdWriter(file, plan) as writer:
>>> with output_path.open('wb') as file, CphdWriter(file, plan) as writer:
... writer.write_signal("1", signal)
... writer.write_pvp("1", pvp)
Expand All @@ -531,12 +521,7 @@ class CphdWriter:
"""

def __init__(self, file, plan):
if sarkit.standards.general.utils.is_file_like(file):
self._file_owned = False
self._file_object = file
else:
self._file_owned = True
self._file_object = open(file, "wb")
self._file_object = file

self._plan = plan

Expand Down Expand Up @@ -756,15 +741,8 @@ def write_support_array(
output_dtype = support_array.dtype.newbyteorder(">")
support_array.astype(output_dtype, copy=False).tofile(self._file_object)

def close(self):
"""Close any files opened by the writer"""
if self._file_owned:
self._file_object.close()

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
def done(self):
"""Warn about unwritten arrays declared in the XML"""
channel_names = set(
node.text
for node in self._plan.cphd_xmltree.findall(
Expand Down Expand Up @@ -793,4 +771,8 @@ def __exit__(self, *args, **kwargs):
if missing_sa:
logging.warning(f"Not all Support Arrays written. Missing {missing_sa}")

self.close()
def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.done()
3 changes: 0 additions & 3 deletions sarkit/standards/sicd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
.. autosummary::
:toctree: generated/
read_sicd_xml
SicdNitfReader
SicdNitfWriter
Expand Down Expand Up @@ -145,7 +144,6 @@
SicdNitfReader,
SicdNitfSecurityFields,
SicdNitfWriter,
read_sicd_xml,
)
from .projection.derived import (
image_to_constant_hae_surface,
Expand All @@ -162,7 +160,6 @@
"SicdNitfReader",
"SicdNitfSecurityFields",
"SicdNitfWriter",
"read_sicd_xml",
]

# Projections
Expand Down
70 changes: 16 additions & 54 deletions sarkit/standards/sicd/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import datetime
import importlib.resources
import itertools
import os
import warnings
from typing import Any, BinaryIO, Self, cast
from typing import Any, Self

import lxml.etree
import numpy as np
Expand Down Expand Up @@ -425,21 +424,21 @@ class SicdNitfReader:
Parameters
----------
file : file-like or path-like
file : `file object`
SICD NITF file to read
Examples
--------
>>> with SicdNitfReader(sicd_filename) as reader:
>>> with sicd_path.open('rb') as file, SicdNitfReader(file) as reader:
... sicd_xmltree = reader.sicd_xmltree
... pixels = reader.read_image()
Attributes
----------
sicd_xmltree
header_fields
is_fields
des_fields
sicd_xmltree : lxml.etree.ElementTree
header_fields : SicdNitfHeaderFields
is_fields : SicdNitfImageSegmentFields
des_fields : SicdNitfDESegmentFields
nitf_plan : :py:class:`SicdNitfPlan`
A SicdNitfPlan object suitable for use in a SicdNitfWriter
Expand All @@ -449,14 +448,8 @@ class SicdNitfReader:
SicdNitfWriter
"""

def __init__(self, file: BinaryIO | str | os.PathLike):
if sarkit.standards.general.utils.is_file_like(file):
self._file_owned = False
self._file_object = file
else:
file = cast(str | os.PathLike, file)
self._file_owned = True
self._file_object = open(file, "rb")
def __init__(self, file):
self._file_object = file

self._initial_offset = self._file_object.tell()
if self._initial_offset != 0:
Expand Down Expand Up @@ -556,16 +549,15 @@ def read_sub_image(
# TODO update XML
raise NotImplementedError()

def close(self):
"""Close any files opened by the reader"""
if self._file_owned:
self._file_object.close()
def done(self):
"Indicates to the reader that the user is done with it"
self._file_object = None

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()
self.done()


def _create_des_manager(sicd_xmltree, des_fields):
Expand Down Expand Up @@ -607,7 +599,7 @@ class SicdNitfWriter:
Parameters
----------
file : file-like
file : `file object`
SICD NITF file to write
nitf_plan : :py:class:`SicdNitfPlan`
NITF plan object
Expand All @@ -624,7 +616,7 @@ class SicdNitfWriter:
... is_fields=SicdNitfImageSegmentFields(isorce='my sensor',
... security=SicdNitfSecurityFields(clas='U')),
... des_fields=SicdNitfDESegmentFields(security=SicdNitfSecurityFields(clas='U')))
>>> with SicdNitfWriter(output_filename, plan) as writer:
>>> with output_path.open('wb') as file, SicdNitfWriter(file, plan) as writer:
... writer.write_image(pixel_array)
See Also
Expand All @@ -634,12 +626,7 @@ class SicdNitfWriter:
"""

def __init__(self, file, nitf_plan: SicdNitfPlan):
if sarkit.standards.general.utils.is_file_like(file):
self._file_owned = False
self._file_object = file
else:
self._file_owned = True
self._file_object = open(file, "wb")
self._file_object = file

self._initial_offset = self._file_object.tell()
if self._initial_offset != 0:
Expand Down Expand Up @@ -822,34 +809,9 @@ def close(self):
Called automatically when SicdNitfWriter is used as a context manager
"""
self._nitf_writer.close()
if self._file_owned:
self._file_object.close()

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()


def read_sicd_xml(file: str | os.PathLike | BinaryIO) -> lxml.etree.ElementTree:
"""Convenience function for reading SICD XML from a file
Parameters
----------
file : file-like or path-like
SICD XML or NITF file
Returns
-------
lxml.etree.ElementTree
SICD XML ElementTree
"""

try:
return lxml.etree.parse(file)
except lxml.etree.XMLSyntaxError:
pass

with SicdNitfReader(file) as reader:
return reader.sicd_xmltree
Loading

0 comments on commit ff9d7ae

Please sign in to comment.