Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port fixes from: sarpy#508, sarpy#530, sarpy#538, sarpy#561; fix docs warning #2

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 42 additions & 69 deletions sarkit/standards/general/nitf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import platform
import struct
from collections import OrderedDict, namedtuple
from collections import OrderedDict
from typing import BinaryIO, List, Optional, Sequence, Tuple, Union

import numpy
Expand Down Expand Up @@ -378,51 +378,6 @@ def _verify_image_segment_compatibility(
return True


def _correctly_order_image_segment_collection(
image_headers: Sequence[ImageSegmentHeader],
) -> Tuple[int, ...]:
"""
Determines the proper order, based on IALVL and IDLVL, for a collection of entries
which will be assembled into a composite image.

Parameters
----------
image_headers : Sequence[ImageSegmentHeader]

Returns
-------
Tuple[int, ...]

Raises
------
ValueError
If incompatible IALVL values collection
"""

ImLvl = namedtuple("ImLvl", ["index", "IALVL", "IDLVL"])
im_levels = sorted(
[
ImLvl(index=n, IALVL=hdr.IALVL, IDLVL=hdr.IDLVL)
for n, hdr in enumerate(image_headers)
],
key=lambda x: x.IDLVL,
)
if all(entry.IALVL == 0 for entry in im_levels):
# all IALVL is 0, and order doesn't matter
return tuple(range(len(image_headers)))
if im_levels[0].IALVL == 0 and all(
im_levels[n].IALVL == im_levels[n - 1].IDLVL for n in range(1, len(im_levels))
):
# From ISO/IEC Joint BIIF Profile JBP-2024.1 - 4.5.4.2 Attachment Level (ALVL) Interpretation
# The image or graphic segment in the MI collection (multi files) with the minimum display level has an
# attachment level of zero.
# The attachment level of an item is equal to the display level of the item to which it is "attached.""
return tuple(x.index for x in im_levels)
raise ValueError(
f"Unable to interpret image segment attachment/display levels:\n{im_levels}\n per ISO/IEC JBP"
)


def _get_collection_element_coordinate_limits(
image_headers: Sequence[ImageSegmentHeader], return_clevel: bool = False
) -> Union[numpy.ndarray, Tuple[numpy.ndarray, int]]:
Expand All @@ -447,27 +402,32 @@ def _get_collection_element_coordinate_limits(
The CLEVEL for this common coordinate system, only returned if
`return_clevel=True`
"""
unique_idlvls = set(im.IDLVL for im in image_headers)
image_headers = sorted(image_headers, key=lambda x: x.IDLVL)
if len(unique_idlvls) != len(image_headers):
raise ValueError(
'Headers violate: "Every image and graphic component in a JBP file will have a unique display level"'
)
if not all(
(im.IALVL in unique_idlvls) and (im.IALVL < im.IDLVL)
for im in image_headers[1:]
):
raise ValueError(
'Headers violate: "The attachment level of an item is equal to the display level of the item to which '
'it is “attached.” Items can only be attached to existing items at a lower display level."'
)
# IDLVL -> [Row, Col]
loc = {image_headers[0].IALVL: numpy.zeros(2)}

the_indices = _correctly_order_image_segment_collection(image_headers)

block_definition = numpy.empty((len(the_indices), 4), dtype="int64")
for i, image_ind in enumerate(the_indices):
img_header = image_headers[image_ind]
rows = img_header.NROWS
cols = img_header.NCOLS
iloc = img_header.ILOC
if img_header.IALVL == 0 or i == 0:
previous_indices = numpy.zeros((4,), dtype="int64")
else:
previous_indices = block_definition[i - 1, :]
rel_row_start, rel_col_start = int(iloc[:5]), int(iloc[5:])
abs_row_start = rel_row_start + previous_indices[0]
abs_col_start = rel_col_start + previous_indices[2]
block_definition = numpy.empty((len(image_headers), 4), dtype="int64")
for i, im in enumerate(image_headers):
iloc_offset = numpy.array([int(im.ILOC[:5]), int(im.ILOC[5:])])
loc[im.IDLVL] = loc[im.IALVL] + iloc_offset
block_definition[i, :] = (
abs_row_start,
abs_row_start + rows,
abs_col_start,
abs_col_start + cols,
loc[im.IDLVL][0],
loc[im.IDLVL][0] + im.NROWS,
loc[im.IDLVL][1],
loc[im.IDLVL][1] + im.NCOLS,
)

# now, re-normalize the coordinate system to be sensible
Expand Down Expand Up @@ -1041,7 +1001,7 @@ def get_headers_json(self) -> dict:
]
return out

def __del__(self):
def close(self):
if self._close_after:
self._close_after = False
# noinspection PyBroadException
Expand All @@ -1050,6 +1010,9 @@ def __del__(self):
except Exception:
pass

def __del__(self):
self.close()


class NITFReader(BaseReader):
"""A reader implementation based around array-type image data fetching for NITF 2.1 files."""
Expand Down Expand Up @@ -1548,7 +1511,7 @@ def _handle_no_compression(
raw_dtype,
complex_order,
lut,
raw_band_dimension,
2,
image_segment_index=image_segment_index,
)
use_transpose = self._transpose_axes
Expand Down Expand Up @@ -2176,6 +2139,7 @@ def get_data_segments(self) -> List[DataSegment]:
return out

def close(self) -> None:
self._nitf_details.close()
self._image_segment_data_segments = None
BaseReader.close(self)

Expand Down Expand Up @@ -3290,6 +3254,7 @@ class NITFWriter(BaseWriter):
"_in_memory",
"_nitf_writing_details",
"_image_segment_data_segments",
"_close_after",
)

def __init__(
Expand All @@ -3315,6 +3280,7 @@ def __init__(

self._nitf_writing_details = None
self._image_segment_data_segments = [] # type: List[DataSegment]
self._close_after = False

if isinstance(file_object, str):
if check_existence and os.path.exists(file_object):
Expand All @@ -3324,6 +3290,7 @@ def __init__(
)
)
file_object = open(file_object, "wb")
self._close_after = True

if not is_file_like(file_object):
raise ValueError("file_object requires a file path or BinaryIO object")
Expand Down Expand Up @@ -3699,7 +3666,7 @@ def _handle_no_compression(
raw_dtype,
complex_order,
lut,
raw_band_dimension,
2,
image_segment_index=image_segment_index,
)
use_transpose = None
Expand Down Expand Up @@ -4105,4 +4072,10 @@ def close(self) -> None:

self._nitf_writing_details = None
self._image_segment_data_segments = None
self._file_object.close()
if self._close_after:
self._close_after = False
# noinspection PyBroadException
try:
self._file_object.close()
except Exception:
pass
4 changes: 2 additions & 2 deletions sarkit/verification/consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ def passes(self):
def skips(self, include_partial=False):
"""Returns passed checks that are No-Op.

Args
----
Parameters
----------
include_partial : bool, optional
Include checks that are partially No-Op? False by default.

Expand Down
Binary file added tests/standards/general/data/iq.nitf
Binary file not shown.
23 changes: 23 additions & 0 deletions tests/standards/general/data/make_iq_nitf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Make a simple NITF 2.1 with two bands (I/Q) stored as band interleave by block."""

import pathlib

import numpy as np
from osgeo import gdal

data = np.arange(20, dtype=np.float32).reshape((2, 5, 2)).view(np.complex64)[..., 0]
dst_filename = pathlib.Path(__file__).parent / "iq.nitf"

driver = gdal.GetDriverByName("NITF")
dst_ds = driver.Create(
str(dst_filename),
xsize=data.shape[1],
ysize=data.shape[0],
bands=2,
eType=gdal.GDT_Float32,
options=["ISUBCAT=I,Q"],
)
dst_ds.GetRasterBand(1).WriteArray(data.real)
dst_ds.GetRasterBand(2).WriteArray(data.imag)
# As of 2024-11-19, latest available gdal from anaconda (v3.6.2) seemingly does not have a close
dst_ds = None
66 changes: 66 additions & 0 deletions tests/standards/general/test_nitf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import filecmp
import pathlib

import numpy as np

import sarkit.standards.general.nitf


def test_iq_band_interleaved_by_block(tmp_path):
in_nitf = pathlib.Path(__file__).parent / "data/iq.nitf"
with sarkit.standards.general.nitf.NITFReader(str(in_nitf)) as reader:
data = reader.read()
data_raw = reader.read_raw()
assert data_raw.ndim == data.ndim + 1
assert data_raw.size == data.size * 2
assert np.iscomplexobj(data)

data_offset = reader.nitf_details.img_segment_offsets[0]
manual_bytes = in_nitf.read_bytes()[data_offset : data_offset + data.nbytes]
manual_data_raw = np.frombuffer(manual_bytes, dtype=data_raw.dtype).reshape(
data_raw.shape
)
manual_data = (
manual_data_raw[0] + 1j * manual_data_raw[1]
) # interleaved by block
assert np.array_equal(data, manual_data)

out_nitf = tmp_path / "out.nitf"
writer_details = sarkit.standards.general.nitf.NITFWritingDetails(
reader.nitf_details.nitf_header,
(
sarkit.standards.general.nitf.ImageSubheaderManager(
reader.get_image_header(0)
),
),
reader.image_segment_collections,
)
with sarkit.standards.general.nitf.NITFWriter(
str(out_nitf), writing_details=writer_details
) as writer:
writer.write(data)
assert filecmp.cmp(in_nitf, out_nitf, shallow=False)


def test_write_filehandle(tmp_path):
in_nitf = pathlib.Path(__file__).parent / "data/iq.nitf"
with sarkit.standards.general.nitf.NITFReader(str(in_nitf)) as reader:
data = reader.read()
writer_details = sarkit.standards.general.nitf.NITFWritingDetails(
reader.nitf_details.nitf_header,
(
sarkit.standards.general.nitf.ImageSubheaderManager(
reader.get_image_header(0)
),
),
reader.image_segment_collections,
)

out_nitf = tmp_path / "output.nitf"
with out_nitf.open("wb") as fd:
with sarkit.standards.general.nitf.NITFWriter(
fd, writing_details=writer_details
) as writer:
writer.write(data)

assert not fd.closed