diff --git a/tests/test_coreg/test_affine.py b/tests/test_coreg/test_affine.py index 0baa6114..a57c2bd9 100644 --- a/tests/test_coreg/test_affine.py +++ b/tests/test_coreg/test_affine.py @@ -1,11 +1,11 @@ """Functions to test the affine coregistrations.""" from __future__ import annotations -import copy - import geopandas as gpd +import geoutils import numpy as np import pytest +import pytransform3d import rasterio as rio from geoutils import Raster, Vector from geoutils._typing import NDArrayNum @@ -13,13 +13,8 @@ from geoutils.raster.raster import _shift_transform from scipy.ndimage import binary_dilation -import xdem from xdem import coreg, examples -from xdem.coreg.affine import ( - AffineCoreg, - CoregDict, - _reproject_horizontal_shift_samecrs, -) +from xdem.coreg.affine import AffineCoreg, _reproject_horizontal_shift_samecrs def load_examples(crop: bool = True) -> tuple[RasterType, RasterType, Vector]: @@ -96,14 +91,38 @@ class TestAffineCoreg: ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. inlier_mask = ~outlines.create_mask(ref) - fit_params = dict( - reference_elev=ref.data, - to_be_aligned_elev=tba.data, + # Check all point-raster possibilities supported + # Use the reference DEM for both, it will be artificially misaligned during tests + # Raster-Raster + fit_args_rst_rst = dict( + reference_elev=ref, + to_be_aligned_elev=tba, inlier_mask=inlier_mask, - transform=ref.transform, - crs=ref.crs, verbose=True, ) + + # Convert DEMs to points with a bit of subsampling for speed-up + ref_pts = ref.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + tba_pts = ref.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + + # Raster-Point + fit_args_rst_pts = dict( + reference_elev=ref, + to_be_aligned_elev=tba_pts, + inlier_mask=inlier_mask, + verbose=True, + ) + + # Point-Raster + fit_args_pts_rst = dict( + reference_elev=ref_pts, + to_be_aligned_elev=tba, + inlier_mask=inlier_mask, + verbose=True, + ) + + all_fit_args = [fit_args_rst_rst, fit_args_rst_pts, fit_args_pts_rst] + # Create some 3D coordinates with Z coordinates being 0 to try the apply functions. points_arr = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T points = gpd.GeoDataFrame( @@ -172,55 +191,9 @@ def test_from_classmethods(self) -> None: if "non-finite values" not in str(exception): raise exception - def test_vertical_shift(self) -> None: - - # Create a vertical shift correction instance - vshiftcorr = coreg.VerticalShift() - # Fit the vertical shift model to the data - vshiftcorr.fit(**self.fit_params) - - res = self.ref.res[0] - - # Check that a vertical shift was found. - assert vshiftcorr.meta["outputs"]["affine"].get("shift_z") is not None - assert vshiftcorr.meta["outputs"]["affine"]["shift_z"] != 0.0 - - # Copy the vertical shift to see if it changes in the test (it shouldn't) - vshift = copy.copy(vshiftcorr.meta["outputs"]["affine"]["shift_z"]) - - # Check that the to_matrix function works as it should - matrix = vshiftcorr.to_matrix() - assert matrix[2, 3] == vshift, matrix - - # Check that the first z coordinate is now the vertical shift - assert all(vshiftcorr.apply(self.points)["z"].values == vshiftcorr.meta["outputs"]["affine"]["shift_z"]) - - # Apply the model to correct the DEM - tba_unshifted, _ = vshiftcorr.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) - - # Create a new vertical shift correction model - vshiftcorr2 = coreg.VerticalShift() - # Check that this is indeed a new object - assert vshiftcorr is not vshiftcorr2 - # Fit the corrected DEM to see if the vertical shift will be close to or at zero - vshiftcorr2.fit( - reference_elev=self.ref.data, - to_be_aligned_elev=tba_unshifted, - transform=self.ref.transform, - crs=self.ref.crs, - inlier_mask=self.inlier_mask, - ) - # Test the vertical shift - newmeta: CoregDict = vshiftcorr2.meta - new_vshift = newmeta["outputs"]["affine"]["shift_z"] - assert np.abs(new_vshift) * res < 0.01 - - # Check that the original model's vertical shift has not changed - # (that the _.meta dicts are two different objects) - assert vshiftcorr.meta["outputs"]["affine"]["shift_z"] == vshift - - def test_all_nans(self) -> None: + def test_raise_all_nans(self) -> None: """Check that the coregistration approaches fail gracefully when given only nans.""" + dem1 = np.ones((50, 50), dtype=float) dem2 = dem1.copy() + np.nan affine = rio.transform.from_origin(0, 0, 1, 1) @@ -238,152 +211,296 @@ def test_all_nans(self) -> None: pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) - def test_coreg_example(self, verbose: bool = False) -> None: + @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore + @pytest.mark.parametrize("shifts", [(20, 5, 2), (-50, 100, 2)]) # type: ignore + @pytest.mark.parametrize("coreg_method", [coreg.NuthKaab, coreg.GradientDescending, coreg.ICP]) # type: ignore + def test_coreg_translations__synthetic(self, fit_args, shifts, coreg_method) -> None: """ - Test the co-registration outputs performed on the example are always the same. This overlaps with the test in - test_examples.py, but helps identify from where differences arise. + Test the horizontal/vertical shift coregistrations with synthetic shifted data. These tests include NuthKaab, + ICP and GradientDescending. + + We test all combinaison of inputs: raster-raster, point-raster and raster-point. + + We verify that the shifts found by the coregistration are within 1% of the synthetic shifts with opposite sign + of the ones introduced, and that applying the coregistration to the shifted elevations corrects more than + 99% of the variance from the initial elevation differences (hence, that the direction of coregistration has + to be the right one; and that there is no other errors introduced in the process). """ - # Use full DEMs here (to compare to original values from older package versions) - ref, tba = load_examples(crop=False)[0:2] - inlier_mask = ~self.outlines.create_mask(ref) + # Initiate coregistration + horizontal_coreg = coreg_method() + + # Copy dictionary and remove inlier mask + elev_fit_args = fit_args.copy() + elev_fit_args.pop("inlier_mask") + + # Create synthetic translation from the reference DEM + ref = self.ref + ref_shifted = ref.translate(shifts[0], shifts[1]) + shifts[2] + # Convert to point cloud if input was point cloud + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + ref_shifted = ref_shifted.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + elev_fit_args["to_be_aligned_elev"] = ref_shifted + + # Run coregistration + coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) + + # Check all fit parameters are the opposite of those used above, within a relative 1% (10% for ICP) + fit_shifts = [-horizontal_coreg.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] + + # ICP can be less precise than other methods + rtol = 10e-2 if coreg_method != coreg.ICP else 10e-1 + assert np.allclose(shifts, fit_shifts, rtol=rtol) + + # For a point cloud output, need to interpolate with the other DEM to get dh + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + init_dh = ( + ref.interp_points((ref_shifted.geometry.x.values, ref_shifted.geometry.y.values)) - ref_shifted["z"] + ) + dh = ref.interp_points((coreg_elev.geometry.x.values, coreg_elev.geometry.y.values)) - coreg_elev["z"] + else: + init_dh = ref - ref_shifted.reproject(ref) + dh = ref - coreg_elev.reproject(ref) - # Run co-registration - nuth_kaab = xdem.coreg.NuthKaab(offset_threshold=0.005) - nuth_kaab.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) - - # Check the output .metadata is always the same - shifts = ( - nuth_kaab.meta["outputs"]["affine"]["shift_x"], - nuth_kaab.meta["outputs"]["affine"]["shift_y"], - nuth_kaab.meta["outputs"]["affine"]["shift_z"], - ) - assert shifts == pytest.approx((-9.198341, -2.786257, -1.981793)) + # Plots for debugging + PLOT = False + if PLOT and isinstance(dh, geoutils.Raster): + import matplotlib.pyplot as plt - def test_gradientdescending(self, subsample: int = 10000, verbose: bool = False) -> None: - """ - Test the co-registration outputs performed on the example are always the same. This overlaps with the test in - test_examples.py, but helps identify from where differences arise. + init_dh.plot() + plt.show() + dh.plot() + plt.show() + + # Check applying the coregistration removes 99% of the variance (95% for ICP) + # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity + tol = 0.01 if coreg_method != coreg.ICP else 0.05 + assert np.nanvar(dh / np.nanstd(init_dh)) < tol - It also implicitly tests the z_name kwarg and whether a geometry column can be provided instead of E/N cols. + @pytest.mark.parametrize( + "coreg_method__shift", + [ + (coreg.NuthKaab, (9.202739, 2.735573, -1.97733)), + (coreg.GradientDescending, (10.0, 2.5, -1.964539)), + (coreg.ICP, (8.73833, 1.584255, -1.943957)), + ], + ) # type: ignore + def test_coreg_translations__example( + self, coreg_method__shift: tuple[type[AffineCoreg], tuple[float, float, float]], verbose: bool = False + ) -> None: """ - # Use full DEMs here (to compare to original values from older package versions) + Test that the translation co-registration outputs are always exactly the same on the real example data. + """ + + # Use entire DEMs here (to compare to original values from older package versions) ref, tba = load_examples(crop=False)[0:2] inlier_mask = ~self.outlines.create_mask(ref) + # Get the coregistration method and expected shifts from the inputs + coreg_method, expected_shifts = coreg_method__shift + # Run co-registration - gds = xdem.coreg.GradientDescending(subsample=subsample) - gds.fit( - ref.to_pointcloud(data_column_name="z").ds, - tba, - inlier_mask=inlier_mask, - verbose=verbose, - random_state=42, - ) + c = coreg_method(subsample=50000) + c.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) - shifts = ( - gds.meta["outputs"]["affine"]["shift_x"], - gds.meta["outputs"]["affine"]["shift_y"], - gds.meta["outputs"]["affine"]["shift_z"], - ) - assert shifts == pytest.approx((-10.625, -2.65625, 1.940031), abs=10e-5) + # Check the output translations match the exact values + shifts = [c.meta["outputs"]["affine"][k] for k in ["shift_x", "shift_y", "shift_z"]] # type: ignore + assert shifts == pytest.approx(expected_shifts) - @pytest.mark.parametrize("shift_px", [(1, 1), (2, 2)]) # type: ignore - @pytest.mark.parametrize("coreg_class", [coreg.NuthKaab, coreg.GradientDescending, coreg.ICP]) # type: ignore - @pytest.mark.parametrize("points_or_raster", ["raster", "points"]) - def test_coreg_example_shift(self, shift_px, coreg_class, points_or_raster, verbose=False, subsample=5000): + @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore + @pytest.mark.parametrize("vshift", [0.2, 10.0, 1000.0]) # type: ignore + def test_coreg_vertical_translation__synthetic(self, fit_args, vshift) -> None: """ - For comparison of coreg algorithms: - Shift a ref_dem on purpose, e.g. shift_px = (1,1), and then applying coreg to shift it back. + Test the vertical shift coregistration with synthetic shifted data. These tests include VerticalShift. + + We test all combinaison of inputs: raster-raster, point-raster and raster-point. """ - res = self.ref.res[0] - # shift DEM by shift_px - shifted_ref = self.ref.copy() - shifted_ref.translate(shift_px[0] * res, shift_px[1] * res, inplace=True) + # Create a vertical shift correction instance + vshiftcorr = coreg.VerticalShift() - shifted_ref_points = shifted_ref.to_pointcloud(subsample=subsample, random_state=42).ds - shifted_ref_points.rename(columns={"b1": "z"}, inplace=True) + # Copy dictionary and remove inlier mask + elev_fit_args = fit_args.copy() + elev_fit_args.pop("inlier_mask") - kwargs = {} if coreg_class.__name__ != "GradientDescending" else {"subsample": subsample} + # Create synthetic vertical shift from the reference DEM + ref = self.ref + ref_vshifted = ref + vshift - coreg_obj = coreg_class(**kwargs) + # Convert to point cloud if input was point cloud + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + ref_vshifted = ref_vshifted.to_pointcloud(data_column_name="z", subsample=50000, random_state=42).ds + elev_fit_args["to_be_aligned_elev"] = ref_vshifted - if points_or_raster == "raster": - coreg_obj.fit(shifted_ref, self.ref, verbose=verbose, random_state=42) - elif points_or_raster == "points": - coreg_obj.fit(shifted_ref_points, self.ref, verbose=verbose, random_state=42) + # Fit the vertical shift model to the data + coreg_elev = vshiftcorr.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) - if coreg_class.__name__ == "ICP": - matrix = coreg_obj.to_matrix() - # The ICP fit only creates a matrix and doesn't normally show the alignment in pixels - # Since the test is formed to validate pixel shifts, these calls extract the approximate pixel shift - # from the matrix (it's not perfect since rotation/scale can change it). - coreg_obj.meta["outputs"]["affine"]["shift_x"] = -matrix[0][3] - coreg_obj.meta["outputs"]["affine"]["shift_y"] = -matrix[1][3] + # Check that the right vertical shift was found + assert vshiftcorr.meta["outputs"]["affine"]["shift_z"] == pytest.approx(-vshift, rel=10e-2) - # ICP can never be expected to be much better than 1px on structured data, as its implementation often finds a - # minimum between two grid points. This is clearly warned for in the documentation. - precision = 1e-2 if coreg_class.__name__ != "ICP" else 1 + # For a point cloud output, need to interpolate with the other DEM to get dh + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + init_dh = ( + ref.interp_points((ref_vshifted.geometry.x.values, ref_vshifted.geometry.y.values)) - ref_vshifted["z"] + ) + dh = ref.interp_points((coreg_elev.geometry.x.values, coreg_elev.geometry.y.values)) - coreg_elev["z"] + else: + init_dh = ref - ref_vshifted + dh = ref - coreg_elev + + # Plots for debugging + PLOT = False + if PLOT and isinstance(dh, geoutils.Raster): + import matplotlib.pyplot as plt + + init_dh.plot() + plt.show() + dh.plot() + plt.show() + + # Check that the median difference is zero, and that no additional variance + # was introduced, so that the variance is also close to zero (no variance for a constant vertical shift) + assert np.nanmedian(dh) == pytest.approx(0, abs=10e-6) + assert np.nanvar(dh) == pytest.approx(0, abs=10e-6) + + @pytest.mark.parametrize("coreg_method__vshift", [(coreg.VerticalShift, -2.305015)]) # type: ignore + def test_coreg_vertical_translation__example( + self, coreg_method__vshift: tuple[type[AffineCoreg], tuple[float, float, float]], verbose: bool = False + ) -> None: + """ + Test that the vertical translation co-registration output is always exactly the same on the real example data. + """ + + # Use entire DEMs here (to compare to original values from older package versions) + ref, tba = load_examples(crop=False)[0:2] + inlier_mask = ~self.outlines.create_mask(ref) - assert coreg_obj.meta["outputs"]["affine"]["shift_x"] == pytest.approx(-shift_px[0] * res, rel=precision) - assert coreg_obj.meta["outputs"]["affine"]["shift_y"] == pytest.approx(-shift_px[0] * res, rel=precision) + # Get the coregistration method and expected shifts from the inputs + coreg_method, expected_vshift = coreg_method__vshift - def test_nuth_kaab(self) -> None: + # Run co-registration + c = coreg_method(subsample=50000) + c.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) - nuth_kaab = coreg.NuthKaab(max_iterations=10) + # Check the output translations match the exact values + vshift = c.meta["outputs"]["affine"]["shift_z"] + assert vshift == pytest.approx(expected_vshift) - # Synthesize a shifted and vertically offset DEM - pixel_shift = 2 - vshift = 5 - shifted_dem = self.ref.data.squeeze().copy() - shifted_dem[:, pixel_shift:] = shifted_dem[:, :-pixel_shift] - shifted_dem[:, :pixel_shift] = np.nan - shifted_dem += vshift - - # Fit the synthesized shifted DEM to the original - nuth_kaab.fit( - self.ref.data.squeeze(), - shifted_dem, - transform=self.ref.transform, - crs=self.ref.crs, - verbose=self.fit_params["verbose"], - ) + @pytest.mark.parametrize("fit_args", all_fit_args) # type: ignore + @pytest.mark.parametrize("shifts_rotations", [(20, 5, 0, 0.02, 0.05, 0.1), (-50, 100, 0, 10, 5, 4)]) # type: ignore + @pytest.mark.parametrize("coreg_method", [coreg.ICP]) # type: ignore + def test_coreg_rigid__synthetic(self, fit_args, shifts_rotations, coreg_method) -> None: + """ + Test the rigid coregistrations with synthetic misaligned (shifted and rotatedà data. These tests include ICP. - # Make sure that the estimated offsets are similar to what was synthesized. - res = self.ref.res[0] - assert nuth_kaab.meta["outputs"]["affine"]["shift_x"] == pytest.approx(pixel_shift * res, abs=0.03) - assert nuth_kaab.meta["outputs"]["affine"]["shift_y"] == pytest.approx(0, abs=0.03) - assert nuth_kaab.meta["outputs"]["affine"]["shift_z"] == pytest.approx(-vshift, 0.03) - - # Apply the estimated shift to "revert the DEM" to its original state. - unshifted_dem, _ = nuth_kaab.apply(shifted_dem, transform=self.ref.transform, crs=self.ref.crs) - # Measure the difference (should be more or less zero) - diff = self.ref.data.squeeze() - unshifted_dem - diff = diff.compressed() # turn into a 1D array with only unmasked values - - # Check that the median is very close to zero - assert np.abs(np.median(diff)) < 0.01 - # Check that the RMSE is low - assert np.sqrt(np.mean(np.square(diff))) < 1 - - # Transform some arbitrary points. - transformed_points = nuth_kaab.apply(self.points) - - # Check that the x shift is close to the pixel_shift * image resolution - assert all( - abs((transformed_points.geometry.x.values - self.points.geometry.x.values) + pixel_shift * self.ref.res[0]) - < 0.1 + We test all combinaison of inputs: raster-raster, point-raster and raster-point. + + We verify that the matrix found by the coregistration is within 1% of the synthetic matrix, and inverted from + the one introduced, and that applying the coregistration to the misaligned elevations corrects more than + 95% of the variance from the initial elevation differences (hence, that the direction of coregistration has + to be the right one; and that there is no other errors introduced in the process). + """ + + # Initiate coregistration + horizontal_coreg = coreg_method() + + # Copy dictionary and remove inlier mask + elev_fit_args = fit_args.copy() + elev_fit_args.pop("inlier_mask") + + ref = self.ref + + # Create synthetic rigid transformation (translation and rotation) from the reference DEM + sr = np.array(shifts_rotations) + shifts = sr[:3] + rotations = sr[3:6] + e = np.deg2rad(rotations) + # Derive is a 3x3 rotation matrix + rot_matrix = pytransform3d.rotations.matrix_from_euler(e=e, i=0, j=1, k=2, extrinsic=True) + matrix = np.diag(np.ones(4, float)) + matrix[:3, :3] = rot_matrix + matrix[:3, 3] = shifts + + # Pass a centroid + centroid = [ref.bounds.left, ref.bounds.bottom, np.nanmean(ref)] + ref_shifted_rotated = coreg.apply_matrix(ref, matrix=matrix, centroid=centroid) + + # Convert to point cloud if input was point cloud + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + ref_shifted_rotated = ref_shifted_rotated.to_pointcloud( + data_column_name="z", subsample=50000, random_state=42 + ).ds + elev_fit_args["to_be_aligned_elev"] = ref_shifted_rotated + + # Run coregistration + coreg_elev = horizontal_coreg.fit_and_apply(**elev_fit_args, subsample=50000, random_state=42) + + # Check that fit matrix is the invert of those used above, within a relative 10% for rotations, and within + # a large 100% margin for shifts, as ICP has relative difficulty resolving shifts with large rotations + fit_matrix = horizontal_coreg.meta["outputs"]["affine"]["matrix"] + invert_fit_matrix = coreg.invert_matrix(fit_matrix) + invert_fit_shifts = invert_fit_matrix[:3, 3] + invert_fit_rotations = pytransform3d.rotations.euler_from_matrix( + invert_fit_matrix[0:3, 0:3], i=0, j=1, k=2, extrinsic=True ) - # Check that the z shift is close to the original vertical shift. - assert all(abs((transformed_points["z"].values - self.points["z"].values) + vshift) < 0.1) + invert_fit_rotations = np.rad2deg(invert_fit_rotations) + assert np.allclose(shifts, invert_fit_shifts, rtol=1) + assert np.allclose(rotations, invert_fit_rotations, rtol=10e-1) + + # For a point cloud output, need to interpolate with the other DEM to get dh + if isinstance(elev_fit_args["to_be_aligned_elev"], gpd.GeoDataFrame): + init_dh = ( + ref.interp_points((ref_shifted_rotated.geometry.x.values, ref_shifted_rotated.geometry.y.values)) + - ref_shifted_rotated["z"] + ) + dh = ref.interp_points((coreg_elev.geometry.x.values, coreg_elev.geometry.y.values)) - coreg_elev["z"] + else: + init_dh = ref - ref_shifted_rotated + dh = ref - coreg_elev + + # Plots for debugging + PLOT = False + if PLOT and isinstance(dh, geoutils.Raster): + import matplotlib.pyplot as plt + + init_dh.plot() + plt.show() + dh.plot() + plt.show() + + # Need to standardize by the elevation difference spread to avoid huge/small values close to infinity + # Only 95% of variance as ICP cannot always resolve the last shifts + assert np.nanvar(dh / np.nanstd(init_dh)) < 0.05 + + @pytest.mark.parametrize( + "coreg_method__shifts_rotations", + [(coreg.ICP, (8.738332, 1.584255, -1.943957, 0.0069004, -0.00703, -0.0119733))], + ) # type: ignore + def test_coreg_rigid__example( + self, + coreg_method__shifts_rotations: tuple[type[AffineCoreg], tuple[float, float, float]], + verbose: bool = False, + ) -> None: + """ + Test that the rigid co-registration outputs is always exactly the same on the real example data. + """ - def test_icp_opencv(self) -> None: + # Use entire DEMs here (to compare to original values from older package versions) + ref, tba = load_examples(crop=False)[0:2] + inlier_mask = ~self.outlines.create_mask(ref) - # Do a fast and dirty 3 iteration ICP just to make sure it doesn't error out. - icp = coreg.ICP(max_iterations=3) - icp.fit(**self.fit_params) + # Get the coregistration method and expected shifts from the inputs + coreg_method, expected_shifts_rots = coreg_method__shifts_rotations + + # Run co-registration + c = coreg_method(subsample=50000) + c.fit(ref, tba, inlier_mask=inlier_mask, verbose=verbose, random_state=42) - aligned_dem, _ = icp.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs) + # Check the output translations match the exact values + fit_matrix = c.meta["outputs"]["affine"]["matrix"] + fit_shifts = fit_matrix[:3, 3] + fit_rotations = pytransform3d.rotations.euler_from_matrix(fit_matrix[0:3, 0:3], i=0, j=1, k=2, extrinsic=True) + fit_rotations = np.rad2deg(fit_rotations) + fit_shifts_rotations = tuple(np.concatenate((fit_shifts, fit_rotations))) - assert aligned_dem.shape == self.ref.data.squeeze().shape + assert fit_shifts_rotations == pytest.approx(expected_shifts_rots, abs=10e-6) diff --git a/xdem/coreg/affine.py b/xdem/coreg/affine.py index ca4e58cc..700559fd 100644 --- a/xdem/coreg/affine.py +++ b/xdem/coreg/affine.py @@ -271,15 +271,18 @@ def sub_dh_interpolator(shift_x: float, shift_y: float) -> NDArrayf: def sub_dh_interpolator(shift_x: float, shift_y: float) -> NDArrayf: """Elevation difference interpolator for shifted coordinates of the subsample.""" - diff_rst_pts = pts_elev[z_name][sub_mask].values - rst_elev_interpolator( - (sub_coords[1] + shift_y, sub_coords[0] + shift_x) - ) - # Always return ref minus tba if ref == "point": - return diff_rst_pts + return pts_elev[z_name][sub_mask].values - rst_elev_interpolator( + (sub_coords[1] + shift_y, sub_coords[0] + shift_x) + ) + # Also invert the shift direction on the raster interpolator, so that the shift is the same relative to + # the reference (returns the right shift relative to the reference no matter if it is point or raster) else: - return -diff_rst_pts + return ( + rst_elev_interpolator((sub_coords[1] - shift_y, sub_coords[0] - shift_x)) + - pts_elev[z_name][sub_mask].values + ) # Interpolate arrays of bias variables to the subsample point coordinates if aux_vars is not None: @@ -689,10 +692,10 @@ def func_cost(offset: tuple[float, float]) -> float: errorcontrol=False, ) - # Get final offsets - offset_east = res.x[0] - offset_north = res.x[1] - offset_vertical = float(-np.nanmedian(dh_interpolator(offset_east, offset_north))) + # Get final offsets with the right sign direction + offset_east = -res.x[0] + offset_north = -res.x[1] + offset_vertical = float(np.nanmedian(dh_interpolator(-offset_east, -offset_north))) return offset_east, offset_north, offset_vertical @@ -1413,7 +1416,7 @@ def _fit_rst_pts( # Write output to class # (Mypy does not pass with normal dict, requires "OutAffineDict" here for some reason...) - output_affine = OutAffineDict(shift_x=easting_offset, shift_y=northing_offset, shift_z=vertical_offset) + output_affine = OutAffineDict(shift_x=-easting_offset, shift_y=-northing_offset, shift_z=vertical_offset) self._meta["outputs"]["affine"] = output_affine self._meta["outputs"]["random"] = {"subsample_final": subsample_final} @@ -1422,8 +1425,8 @@ def _to_matrix_func(self) -> NDArrayf: # We add a translation, on the last column matrix = np.diag(np.ones(4, dtype=float)) - matrix[0, 3] -= self._meta["outputs"]["affine"]["shift_x"] - matrix[1, 3] -= self._meta["outputs"]["affine"]["shift_y"] + matrix[0, 3] += self._meta["outputs"]["affine"]["shift_x"] + matrix[1, 3] += self._meta["outputs"]["affine"]["shift_y"] matrix[2, 3] += self._meta["outputs"]["affine"]["shift_z"] return matrix @@ -1443,7 +1446,7 @@ class GradientDescending(AffineCoreg): def __init__( self, x0: tuple[float, float] = (0, 0), - bounds: tuple[float, float] = (-3, 3), + bounds: tuple[float, float] = (-10, 10), deltainit: int = 2, deltatol: float = 0.004, feps: float = 0.0001, diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index f6ab079e..4bf96f5c 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -467,6 +467,12 @@ def _postprocess_coreg_apply_rst( # Resample the array on the original grid if resample: + + # TODO: Use this function for a translation only, for consistency with the rest of Coreg? + # (would require checking transform difference is only a translation) + # applied_elev = _reproject_horizontal_shift_samecrs(raster_arr=applied_elev, src_transform=out_transform, + # dst_transform=transform) + # Reproject the DEM from its out_transform onto the transform applied_rst = gu.Raster.from_array(applied_elev, out_transform, crs=crs, nodata=nodata) if not isinstance(elev, gu.Raster):