Skip to content

Commit

Permalink
Copy CSLC metadata to product in addition to compressed slc (#155)
Browse files Browse the repository at this point in the history
* Copy CSLC metadata to product, not just compressed slc

* more logging for product creation

* `orbit_pass_direction` is in identification, `orbit_direction` was `metadata`
  • Loading branch information
scottstanie authored Sep 27, 2024
1 parent a5ec761 commit f6bc37d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 30 deletions.
2 changes: 2 additions & 0 deletions src/disp_s1/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ def create_displacement_products(
Default is 2.
"""
# Extra logging for product creation
setup_logging(logger_name="disp_s1", debug=True)
tropo_files = out_paths.tropospheric_corrections or [None] * len(
out_paths.timeseries_paths
)
Expand Down
109 changes: 80 additions & 29 deletions src/disp_s1/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import datetime
import logging
from collections.abc import Mapping
from collections.abc import Iterable, Mapping
from concurrent.futures import ProcessPoolExecutor
from io import StringIO
from multiprocessing import get_context
Expand Down Expand Up @@ -330,6 +330,11 @@ def create_output_product(
pge_runconfig=pge_runconfig,
dolphin_config=dolphin_config,
)
copy_cslc_metadata_to_displacement(
reference_cslc_file=reference_start,
secondary_cslc_file=secondary_start,
output_disp_file=output_name,
)


def _create_corrections_group(
Expand Down Expand Up @@ -595,7 +600,7 @@ def _create_metadata_group(
def _to_string(model: YamlModel):
ss = StringIO()
model.to_yaml(ss)
return ss.getvalue()
return "".join(c for c in ss.getvalue() if ord(c) < 128)

_create_dataset(
group=metadata_group,
Expand Down Expand Up @@ -869,19 +874,49 @@ def process_compressed_slc(info: CompressedSLCInfo) -> Path:
grid_mapping_dset_name=grid_mapping_dset_name,
)

copy_opera_cslc_metadata(opera_cslc_file, outname)
copy_cslc_metadata_to_compressed(opera_cslc_file, outname)

return outname


def copy_opera_cslc_metadata(
comp_slc_file: Filename, output_hdf5_file: Filename
def _copy_hdf5_dsets(
source_file: Filename,
dest_file: Filename,
dsets_to_copy: Iterable[str],
prepend_str: str = "",
error_on_missing: bool = False,
) -> None:
"""Copy orbit and metadata datasets from the input CSLC file the compressed SLC.
with h5py.File(source_file, "r") as src, h5py.File(dest_file, "a") as dst:
for dset_path in dsets_to_copy:
if dset_path not in src:
msg = f"Dataset or group {dset_path} not found in {source_file}"
if error_on_missing:
raise ValueError(msg)
else:
logger.warning(msg)
continue

# Create parent group if it doesn't exist
out_group = str(Path(dset_path).parent)
dst.require_group(out_group)

# Remove existing dataset/group if it exists
if dset_path in dst:
del dst[dset_path]

# Copy the dataset or group
new_name = f"{prepend_str}{Path(dset_path).name}"
src.copy(src[dset_path], dst[str(Path(dset_path).parent)], name=new_name)


def copy_cslc_metadata_to_compressed(
opera_cslc_file: Filename, output_hdf5_file: Filename
) -> None:
"""Copy orbit and metadata datasets from the input CSLC file to the compressed SLC.
Parameters
----------
comp_slc_file : Filename
opera_cslc_file : Filename
Path to the input CSLC file.
output_hdf5_file : Filename
Path to the output compressed SLC file.
Expand All @@ -896,31 +931,47 @@ def copy_opera_cslc_metadata(
"/identification/look_direction",
"/identification/mission_id",
"/identification/track_number",
"/identification/orbit_direction",
"/identification/orbit_pass_direction",
]
_copy_hdf5_dsets(
source_file=opera_cslc_file,
dest_file=output_hdf5_file,
dsets_to_copy=dsets_to_copy,
)
logger.debug(f"Copied metadata from {opera_cslc_file} to {output_hdf5_file}")

with h5py.File(comp_slc_file, "r") as src, h5py.File(output_hdf5_file, "a") as dst:
for dset_path in dsets_to_copy:
if dset_path in src:
# Create parent group if it doesn't exist
dst.require_group(str(Path(dset_path).parent))

# Remove existing dataset/group if it exists
if dset_path in dst:
del dst[dset_path]

# Copy the dataset or group
src.copy(
src[dset_path],
dst[str(Path(dset_path).parent)],
name=Path(dset_path).name,
)
else:
logger.warning(
f"Dataset or group {dset_path} not found in {comp_slc_file}"
)

logger.info(f"Copied metadata from {comp_slc_file} to {output_hdf5_file}")
def copy_cslc_metadata_to_displacement(
reference_cslc_file: Filename,
secondary_cslc_file: Filename,
output_disp_file: Filename,
) -> None:
"""Copy metadata from input reference/secondary CSLC files to DISP output."""
dsets_to_copy = [
"/metadata/orbit", # Group
]
for cslc_file, prepend_str in zip(
[reference_cslc_file, secondary_cslc_file], ["reference", "secondary"]
):
_copy_hdf5_dsets(
source_file=cslc_file,
dest_file=output_disp_file,
dsets_to_copy=dsets_to_copy,
prepend_str=prepend_str,
)

# Add ones which should be same for both ref/sec
common_dsets = [
"/identification/mission_id",
"/identification/look_direction",
"/identification/track_number",
"/identification/orbit_pass_direction",
]
_copy_hdf5_dsets(
source_file=reference_cslc_file,
dest_file=output_disp_file,
dsets_to_copy=common_dsets,
)


def create_compressed_products(
Expand Down
9 changes: 8 additions & 1 deletion src/disp_s1/solid_earth_tides.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import date, datetime
from typing import Optional, TypeAlias
Expand All @@ -18,6 +19,8 @@
# https://github.com/pandas-dev/pandas-stubs/blob/1bc27e67098106089ce1e61b60c42aa81ec286af/pandas-stubs/_typing.pyi#L65-L66
DateTimeLike: TypeAlias = date | datetime | pd.Timestamp

logger = logging.getLogger(__name__)


def resample_to_target(array: np.ndarray, target_shape: tuple[int, int]) -> np.ndarray:
"""Resample a 2D array to a target shape using zoom."""
Expand Down Expand Up @@ -180,8 +183,9 @@ def calculate_solid_earth_tides_correction(
res_lat_arr = np.ones(ref_time_tile.shape) * atr["Y_STEP"]
res_lon_arr = np.ones(ref_time_tile.shape) * atr["X_STEP"]

logger.info("Running `run_solid_grid` in parallel")
# Parallelize grid calculation using ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
with ThreadPoolExecutor(max_workers=5) as executor:
ref_input_data = zip(
ref_time_tile.ravel(),
lat_geo_mesh.ravel(),
Expand All @@ -200,6 +204,7 @@ def calculate_solid_earth_tides_correction(
ref_results = list(executor.map(lambda x: run_solid_grid(*x), ref_input_data))
sec_results = list(executor.map(lambda x: run_solid_grid(*x), sec_input_data))

logger.info("Reshaping results")
ref_e_flat, ref_n_flat, ref_u_flat = zip(*ref_results)
set_east_ref, set_north_ref, set_up_ref = (
np.array(ref_e_flat).reshape(atr["LENGTH"], atr["WIDTH"]),
Expand All @@ -219,6 +224,7 @@ def calculate_solid_earth_tides_correction(
x_coord_array = np.linspace(bounds.left, bounds.right, num=atr["WIDTH"])
id_y, id_x = np.mgrid[0:height, 0:width]

logger.info(f"Transforming coordinates with {affine_transform}")
# Convert grid indices (x, y) to UTM
x, y = rasterio.transform.xy(affine_transform, id_y.ravel(), id_x.ravel())
e_arr = np.clip(
Expand All @@ -229,6 +235,7 @@ def calculate_solid_earth_tides_correction(
)

# Interpolate SET components
logger.info("Interpolating components")
set_east_interp, set_north_interp, set_up_interp = interpolate_set_components(
n_arr,
e_arr,
Expand Down

0 comments on commit f6bc37d

Please sign in to comment.