diff --git a/pandora/cost_volume_confidence/__init__.py b/pandora/cost_volume_confidence/__init__.py index 969576a..5a10167 100644 --- a/pandora/cost_volume_confidence/__init__.py +++ b/pandora/cost_volume_confidence/__init__.py @@ -1,6 +1,3 @@ -# pylint: disable=missing-module-docstring -# -# coding: utf8 # Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). # # This file is part of PANDORA @@ -27,5 +24,3 @@ from . import interval_bounds from . import std_intensity from . import risk - -# pylint: disable=missing-module-docstring diff --git a/pandora/cost_volume_confidence/ambiguity.py b/pandora/cost_volume_confidence/ambiguity.py index f280c52..fc13b54 100644 --- a/pandora/cost_volume_confidence/ambiguity.py +++ b/pandora/cost_volume_confidence/ambiguity.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python -# coding: utf8 -# # Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). # # This file is part of PANDORA @@ -68,6 +65,8 @@ def __init__(self, **cfg: str) -> None: self._eta_max = float(self.cfg["eta_max"]) self._eta_step = float(self.cfg["eta_step"]) self._indicator = self._method + str(self.cfg["indicator"]) + self._etas = np.arange(self._eta_min, self._eta_max, self._eta_step) + self._nbr_etas = self._etas.shape[0] def check_conf(self, **cfg: Union[str, float]) -> Dict[str, Union[str, float]]: """ @@ -130,11 +129,19 @@ def confidence_prediction( - confidence_measure 3D xarray.DataArray (row, col, indicator) """ + + grids = np.array( + [img_left["disparity"].sel(band_disp="min"), img_left["disparity"].sel(band_disp="max")], dtype=np.int64 + ) + # Get disparity intervals parameters + disparity_range = cv["disp"].data.astype(np.float32) # This silences numba's TBB threading layer warning with warnings.catch_warnings(): warnings.filterwarnings("ignore") # Computes ambiguity using numba in parallel for memory and computation time optimization - ambiguity = self.compute_ambiguity(cv["cost_volume"].data, self._eta_min, self._eta_max, self._eta_step) + ambiguity = self.compute_ambiguity( + cv["cost_volume"].data, self._etas, self._nbr_etas, grids, disparity_range + ) # If activated, ambiguity normalization with percentile if self._normalization: @@ -152,9 +159,9 @@ def normalize_with_percentile(self, ambiguity): Normalize ambiguity with percentile :param ambiguity: ambiguity - :type ambiguity: 2D np.array (row, col) dtype = float32 + :type ambiguity: 2D np.ndarray (row, col) dtype = float32 :return: the normalized ambiguity - :rtype: 2D np.array (row, col) dtype = float32 + :rtype: 2D np.ndarray (row, col) dtype = float32 """ norm_amb = np.copy(ambiguity) perc_min = np.percentile(norm_amb, self._percentile) @@ -165,121 +172,156 @@ def normalize_with_percentile(self, ambiguity): @staticmethod @njit( - "f4[:, :](f4[:, :, :], f4, f4, f4)", - parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "True")), + "f4[:, :](f4[:, :, :], f8[:], i8, i8[:, :, :],f4[:])", + parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "False")), cache=True, ) - def compute_ambiguity(cv: np.ndarray, _eta_min: float, _eta_max: float, _eta_step: float) -> np.ndarray: + def compute_ambiguity( + cv: np.ndarray, + etas: np.ndarray, + nbr_etas: int, + grids: np.ndarray, + disparity_range: np.ndarray, + ) -> np.ndarray: """ Computes ambiguity. :param cv: cost volume - :type cv: 3D np.array (row, col, disp) - :param _eta_min: minimal eta - :type _eta_min: float - :param _eta_max: maximal eta - :type _eta_max: float - :param _eta_step: eta step - :type _eta_step: float + :type cv: 3D np.ndarray (row, col, disp) + :param etas: range between eta_min and eta_max with step eta_step + :type etas: np.ndarray + :param nbr_etas: number of etas + :type nbr_etas: int + :param grids: array containing min and max disparity grids + :type grids: 2D np.ndarray (min, max) + :param disparity_range: array containing disparity range + :type disparity_range: np.ndarray :return: the normalized ambiguity - :rtype: 2D np.array (row, col) dtype = float32 + :rtype: 2D np.ndarray (row, col) dtype = float32 """ - # Miniumum and maximum of all costs, useful to normalize the cost volume + + # Minimum and maximum of all costs, useful to normalize the cost volume min_cost = np.nanmin(cv) max_cost = np.nanmax(cv) n_row, n_col, nb_disps = cv.shape - etas = np.arange(_eta_min, _eta_max, _eta_step) - # Numba does not support the np.tile operation two_dim_etas = np.repeat(etas, nb_disps).reshape((-1, nb_disps)).T.flatten() # integral of ambiguity ambiguity = np.zeros((n_row, n_col), dtype=np.float32) + diff_cost = max_cost - min_cost + for row in prange(n_row): # pylint: disable=not-an-iterable for col in prange(n_col): # pylint: disable=not-an-iterable # Normalized minimum cost for one point - normalized_min_cost = (np.nanmin(cv[row, col, :]) - min_cost) / (max_cost - min_cost) + normalized_min_cost = (np.nanmin(cv[row, col, :]) - min_cost) / diff_cost # If all costs are at nan, set the maximum value of the ambiguity for this point if np.isnan(normalized_min_cost): - ambiguity[row, col] = etas.shape[0] * nb_disps + ambiguity[row, col] = nbr_etas * nb_disps else: - normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * etas.shape[0]) + idx_disp_min = np.searchsorted(disparity_range, grids[0][row, col]) + idx_disp_max = np.searchsorted(disparity_range, grids[1][row, col]) + 1 + + normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * nbr_etas) # Normalized cost volume for one point - normalized_cv = (cv[row, col, :] - min_cost) / (max_cost - min_cost) - #  Mask nan to -inf to increase the value of the ambiguity if a point contains nan costs - normalized_cv[np.isnan(normalized_cv)] = -np.inf - normalized_cv = np.repeat(normalized_cv, etas.shape[0]) + normalized_cv = (cv[row, col, :] - min_cost) / diff_cost - ambiguity[row, col] += np.sum(normalized_cv <= (normalized_min_cost + two_dim_etas)) + # Mask nan to -inf to increase the value of the ambiguity if a point contains nan costs + normalized_cv[idx_disp_min:idx_disp_max][ + np.isnan(normalized_cv[idx_disp_min:idx_disp_max]) + ] = -np.inf + + normalized_cv[:idx_disp_min][np.isnan(normalized_cv[:idx_disp_min])] = np.inf + normalized_cv[idx_disp_max:][np.isnan(normalized_cv[idx_disp_max:])] = np.inf + + normalized_cv = np.repeat(normalized_cv, nbr_etas) + + ambiguity[row, col] += np.nansum(normalized_cv <= (normalized_min_cost + two_dim_etas)) return ambiguity @staticmethod @njit( - "Tuple((f4[:, :],f4[:, :, :]))(f4[:, :, :], f4, f4, f4)", - parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "True")), + "Tuple((f4[:, :],f4[:, :, :]))(f4[:, :, :], f8[:], i8, i8[:, :, :], f4[:])", + parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "False")), cache=True, ) - def compute_ambiguity_and_sampled_ambiguity(cv: np.ndarray, _eta_min: float, _eta_max: float, _eta_step: float): + def compute_ambiguity_and_sampled_ambiguity( + cv: np.ndarray, + etas: np.ndarray, + nbr_etas: int, + grids: np.ndarray, + disparity_range: np.ndarray, + ): """ Return the ambiguity and sampled ambiguity, useful for evaluating ambiguity in notebooks :param cv: cost volume - :type cv: 3D np.array (row, col, disp) - :param _eta_min: minimal eta - :type _eta_min: float - :param _eta_max: maximal eta - :type _eta_max: float - :param _eta_step: eta step - :type _eta_step: float + :type cv: 3D np.ndarray (row, col, disp) + :param etas: range between eta_min and eta_max with step eta_step + :type etas: np.ndarray + :param nbr_etas: nuber of etas + :type nbr_etas: int + :param grids: array containing min and max disparity grids + :type grids: 2D np.ndarray (min, max) + :param disparity_range: array containing disparity range + :type disparity_range: np.ndarray :return: the normalized ambiguity and sampled ambiguity - :rtype: Tuple(2D np.array (row, col) dtype = float32, 3D np.array (row, col) dtype = float32) + :rtype: Tuple(2D np.ndarray (row, col) dtype = float32, 3D np.ndarray (row, col) dtype = float32) """ - #  Miniumum and maximum of all costs, useful to normalize the cost volume + # Minimum and maximum of all costs, useful to normalize the cost volume min_cost = np.nanmin(cv) max_cost = np.nanmax(cv) n_row, n_col, nb_disps = cv.shape - etas = np.arange(_eta_min, _eta_max, _eta_step) - # Numba does not support the np.tile operation two_dim_etas = np.repeat(etas, nb_disps).reshape((-1, nb_disps)).T.flatten() # integral of ambiguity ambiguity = np.zeros((n_row, n_col), dtype=np.float32) - sampled_ambiguity = np.zeros((n_row, n_col, etas.shape[0]), dtype=np.float32) + sampled_ambiguity = np.zeros((n_row, n_col, nbr_etas), dtype=np.float32) + + diff_cost = max_cost - min_cost for row in prange(n_row): # pylint: disable=not-an-iterable for col in prange(n_col): # pylint: disable=not-an-iterable # Normalized minimum cost for one point - normalized_min_cost = (np.nanmin(cv[row, col, :]) - min_cost) / (max_cost - min_cost) + normalized_min_cost = (np.nanmin(cv[row, col, :]) - min_cost) / diff_cost # If all costs are at nan, set the maximum value of the ambiguity for this point if np.isnan(normalized_min_cost): - ambiguity[row, col] = etas.shape[0] * nb_disps + ambiguity[row, col] = nbr_etas * nb_disps sampled_ambiguity[row, col, :] = nb_disps else: - normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * etas.shape[0]) + normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * nbr_etas) # Normalized cost volume for one point normalized_cv = (cv[row, col, :] - min_cost) / (max_cost - min_cost) - #  Mask nan to -inf to increase the value of the ambiguity if a point contains nan costs - normalized_cv[np.isnan(normalized_cv)] = -np.inf - normalized_cv = np.repeat(normalized_cv, etas.shape[0]) + idx_disp_min = np.searchsorted(disparity_range, grids[0][row, col]) + idx_disp_max = np.searchsorted(disparity_range, grids[1][row, col]) + 1 + + # Mask nan to -inf to increase the value of the ambiguity if a point contains nan costs + normalized_cv[idx_disp_min:idx_disp_max][ + np.isnan(normalized_cv[idx_disp_min:idx_disp_max]) + ] = -np.inf + normalized_cv[:idx_disp_min][np.isnan(normalized_cv[:idx_disp_min])] = np.inf + normalized_cv[idx_disp_max:][np.isnan(normalized_cv[idx_disp_max:])] = np.inf + + normalized_cv = np.repeat(normalized_cv, nbr_etas) # fill integral ambiguity - ambiguity[row, col] += np.sum(normalized_cv <= (normalized_min_cost + two_dim_etas)) + ambiguity[row, col] += np.nansum(normalized_cv <= (normalized_min_cost + two_dim_etas)) # fill sampled ambiguity costs_comparison = normalized_cv <= (normalized_min_cost + two_dim_etas) - costs_comparison = costs_comparison.reshape((nb_disps, etas.shape[0])) + costs_comparison = costs_comparison.reshape((nb_disps, nbr_etas)) sampled_ambiguity[row, col, :] = np.sum(costs_comparison, axis=0) return ambiguity, sampled_ambiguity diff --git a/pandora/cost_volume_confidence/cost_volume_confidence.py b/pandora/cost_volume_confidence/cost_volume_confidence.py index a132779..84c1d38 100644 --- a/pandora/cost_volume_confidence/cost_volume_confidence.py +++ b/pandora/cost_volume_confidence/cost_volume_confidence.py @@ -119,12 +119,12 @@ def allocate_confidence_map( ) -> Tuple[xr.Dataset, xr.Dataset]: """ Create or update the confidence measure : confidence_measure (xarray.DataArray of the cost volume and the - disparity map) by adding a the indicator + disparity map) by adding an indicator :param name_confidence_measure: the name of the new confidence indicator :type name_confidence_measure: string - :param confidence_map: the condidence map - :type confidence_map: 2D np.array (row, col) dtype=np.float32 + :param confidence_map: the confidence map + :type confidence_map: 2D np.ndarray (row, col) dtype=np.float32 :param disp: the disparity map dataset or None :type disp: xarray.Dataset or None :param cv: cost volume dataset @@ -147,7 +147,7 @@ def allocate_confidence_map( conf_measure = np.full((nb_row, nb_col, nb_indicator + 1), np.nan, dtype=np.float32) # old confidence measures conf_measure[:, :, :-1] = cv["confidence_measure"].data - #  new confidence measure + # new confidence measure conf_measure[:, :, -1] = confidence_map indicator = np.copy(cv.coords["indicator"]) diff --git a/pandora/cost_volume_confidence/interval_bounds.py b/pandora/cost_volume_confidence/interval_bounds.py index 97874aa..d711ee9 100644 --- a/pandora/cost_volume_confidence/interval_bounds.py +++ b/pandora/cost_volume_confidence/interval_bounds.py @@ -138,7 +138,7 @@ def confidence_prediction( cv: xr.Dataset = None, ) -> Tuple[xr.Dataset, xr.Dataset]: """ - Computes a confidence measure that evaluates the minimum and maximim disparity at + Computes a confidence measure that evaluates the minimum and maximum disparity at each point with a confidence of possibility_threshold % :param disp: the disparity map dataset @@ -216,7 +216,7 @@ def compute_interval_bounds( :param type_factor: Either 1 or -1. Used to adapt the possibility computation to max or min measures :type type_factor: float :return: the infimum and supremum (not regularized) of the set containing the true disparity - :rtype: Tuple(2D np.array (row, col) dtype = float32, 2D np.array (row, col) dtype = float32) + :rtype: Tuple(2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32) """ # IDEA: instead of transforming the cost curve into a possibility, we can compute the cost # threshold T to apply directly to the cost curve to obtain the same result diff --git a/pandora/cost_volume_confidence/risk.py b/pandora/cost_volume_confidence/risk.py index 1e930fb..15745c3 100644 --- a/pandora/cost_volume_confidence/risk.py +++ b/pandora/cost_volume_confidence/risk.py @@ -68,6 +68,8 @@ def __init__(self, **cfg: str) -> None: self._eta_max = float(self.cfg["eta_max"]) self._indicator_max = self._method_max + str(self.cfg["indicator"]) self._indicator_min = self._method_min + str(self.cfg["indicator"]) + self._etas = np.arange(self._eta_min, self._eta_max, self._eta_step) + self._nbr_etas = self._etas.shape[0] def check_conf(self, **cfg: Union[str, float]) -> Dict[str, Union[str, float]]: """ @@ -125,6 +127,12 @@ def confidence_prediction( and 'risk_min_confidence' in the :rtype: Tuple(xarray.Dataset, xarray.Dataset) """ + + grids = np.array( + [img_left["disparity"].sel(band_disp="min"), img_left["disparity"].sel(band_disp="max")], dtype=np.int64 + ) + # Get disparity intervals parameters + disparity_range = cv["disp"].data.astype(np.float32) # This silences numba's TBB threading layer warning with warnings.catch_warnings(): warnings.filterwarnings("ignore") @@ -133,11 +141,16 @@ def confidence_prediction( **{"confidence_method": "ambiguity"} # type: ignore ) _, sampled_ambiguity = ambiguity_.compute_ambiguity_and_sampled_ambiguity( # type: ignore - cv["cost_volume"].data, self._eta_min, self._eta_max, self._eta_step + cv["cost_volume"].data, self._etas, self._nbr_etas, grids, disparity_range ) # Computes risk using numba in parallel for memory and computation time optimization risk_max, risk_min = self.compute_risk( - cv["cost_volume"].data, sampled_ambiguity, self._eta_min, self._eta_max, self._eta_step + cv["cost_volume"].data, + sampled_ambiguity, + self._etas, + self._nbr_etas, + grids, + disparity_range, ) disp, cv = self.allocate_confidence_map(self._indicator_max, risk_max, disp, cv) @@ -147,37 +160,42 @@ def confidence_prediction( @staticmethod @njit( - "Tuple((f4[:, :],f4[:, :]))(f4[:, :, :], f4[:, :, :], f4, f4, f4)", + "Tuple((f4[:, :],f4[:, :]))(f4[:, :, :], f4[:, :, :], f8[:], i8, i8[:, :, :], f4[:])", parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "True")), cache=True, ) def compute_risk( - cv: np.ndarray, sampled_ambiguity: np.ndarray, _eta_min: float, _eta_max: float, _eta_step: float + cv: np.ndarray, + sampled_ambiguity: np.ndarray, + etas: np.ndarray, + nbr_etas: int, + grids: np.ndarray, + disparity_range: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """ Computes minimum and maximum risk. :param cv: cost volume - :type cv: 3D np.array (row, col, disp) + :type cv: 3D np.ndarray (row, col, disp) :param sampled_ambiguity: sampled cost volume ambiguity - :type sampled_ambiguity: 3D np.array (row, col, eta) - :param _eta_min: minimal eta - :type _eta_min: float - :param _eta_max: maximal eta - :type _eta_max: float - :param _eta_step: eta step - :type _eta_step: float + :type sampled_ambiguity: 3D np.ndarray (row, col, eta) + :param etas: range between eta_min and eta_max with step eta_step + :type etas: np.ndarray + :param nbr_etas: number of etas + :type nbr_etas: int + :param grids: array containing min and max disparity grids + :type grids: 2D np.ndarray (min, max) + :param disparity_range: array containing disparity range + :type disparity_range: np.ndarray :return: the minimum and maximum risk - :rtype: Tuple(2D np.array (row, col) dtype = float32, 2D np.array (row, col) dtype = float32) + :rtype: Tuple(2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32) """ - #  Miniumum and maximum of all costs, useful to normalize the cost volume + # Minimum and maximum of all costs, useful to normalize the cost volume min_cost = np.nanmin(cv) max_cost = np.nanmax(cv) n_row, n_col, nb_disps = cv.shape - etas = np.arange(_eta_min, _eta_max, _eta_step) - # Numba does not support the np.tile operation two_dim_etas = np.repeat(etas, nb_disps).reshape((-1, nb_disps)).T.flatten() @@ -195,24 +213,31 @@ def compute_risk( risk_max[row, col] = np.nan risk_min[row, col] = np.nan else: - normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * etas.shape[0]) + normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * nbr_etas) # Normalized cost volume for one point normalized_cv = (cv[row, col, :] - min_cost) / (max_cost - min_cost) - #  Mask nan to -inf to later discard values out of [min; min + eta] - normalized_cv[np.isnan(normalized_cv)] = -np.inf - normalized_cv = np.repeat(normalized_cv, etas.shape[0]) - # Initialize all disparties - disp_cv = np.arange(nb_disps) * 1.0 - disp_cv = np.repeat(disp_cv, etas.shape[0]) - # Remove disparities for every similarity value outside of [min;min+eta[ + # Mask nan to -inf to later discard values out of [min; min + eta] + + idx_disp_min = np.searchsorted(disparity_range, grids[0][row, col]) + idx_disp_max = np.searchsorted(disparity_range, grids[1][row, col]) + 1 + + normalized_cv[idx_disp_min:idx_disp_max][ + np.isnan(normalized_cv[idx_disp_min:idx_disp_max]) + ] = -np.inf + + normalized_cv = np.repeat(normalized_cv, nbr_etas) + # Initialize all disparities + disp_cv = np.arange(nb_disps, dtype=np.float32) + disp_cv = np.repeat(disp_cv, nbr_etas) + # Remove disparities for every similarity value outside [min;min+eta[ disp_cv[normalized_cv > (normalized_min_cost + two_dim_etas)] = np.nan # Reshape to distinguish each sample's disparity range - disp_cv = disp_cv.reshape((nb_disps, etas.shape[0])) + disp_cv = disp_cv.reshape((nb_disps, nbr_etas)) # Initialize minimum and maximum disparities - min_disp = np.zeros(etas.shape[0]) - max_disp = np.zeros(etas.shape[0]) + min_disp = np.zeros(nbr_etas) + max_disp = np.zeros(nbr_etas) # Obtain min and max disparities for each sample - for i in range(etas.shape[0]): + for i in range(nbr_etas): min_disp[i] = np.nanmin(disp_cv[:, i]) max_disp[i] = np.nanmax(disp_cv[:, i]) # fill mean risks @@ -224,38 +249,43 @@ def compute_risk( @staticmethod @njit( - "Tuple((f4[:, :],f4[:, :],f4[:, :, :],f4[:, :, :]))(f4[:, :, :], f4[:, :, :], f4, f4, f4)", + "Tuple((f4[:, :],f4[:, :],f4[:, :, :],f4[:, :, :]))(f4[:, :, :], f4[:, :, :], f8[:], i8, i8[:, :, :], f4[:])", parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "True")), cache=True, ) def compute_risk_and_sampled_risk( - cv: np.ndarray, sampled_ambiguity: np.ndarray, _eta_min: float, _eta_max: float, _eta_step: float + cv: np.ndarray, + sampled_ambiguity: np.ndarray, + etas: np.ndarray, + nbr_etas: int, + grids: np.ndarray, + disparity_range: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Computes minimum and maximum risk and sampled_risk. :param cv: cost volume - :type cv: 3D np.array (row, col, disp) + :type cv: 3D np.ndarray (row, col, disp) :param sampled_ambiguity: sampled cost volume ambiguity - :type sampled_ambiguity: 3D np.array (row, col, eta) - :param _eta_min: minimal eta - :type _eta_min: float - :param _eta_max: maximal eta - :type _eta_max: float - :param _eta_step: eta step - :type _eta_step: float + :type sampled_ambiguity: 3D np.ndarray (row, col, eta) + :param etas: range between eta_min and eta_max with step eta_step + :type etas: np.ndarray + :param nbr_etas: nuber of etas + :type nbr_etas: int + :param grids: array containing min and max disparity grids + :type grids: 2D np.ndarray (min, max) + :param disparity_range: array containing disparity range + :type disparity_range: np.ndarray :return: the risk - :rtype: Tuple(2D np.array (row, col) dtype = float32, 2D np.array (row, col) dtype = float32, - 3D np.array (row, col) dtype = float32, 3D np.array (row, col) dtype = float32) + :rtype: Tuple(2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32, + 3D np.ndarray (row, col) dtype = float32, 3D np.ndarray (row, col) dtype = float32) """ - #  Miniumum and maximum of all costs, useful to normalize the cost volume + # Minimum and maximum of all costs, useful to normalize the cost volume min_cost = np.nanmin(cv) max_cost = np.nanmax(cv) n_row, n_col, nb_disps = cv.shape - etas = np.arange(_eta_min, _eta_max, _eta_step) - # Numba does not support the np.tile operation two_dim_etas = np.repeat(etas, nb_disps).reshape((-1, nb_disps)).T.flatten() @@ -263,8 +293,8 @@ def compute_risk_and_sampled_risk( risk_max = np.zeros((n_row, n_col), dtype=np.float32) risk_min = np.zeros((n_row, n_col), dtype=np.float32) # Initialize min and max sampled risks - sampled_risk_min = np.zeros((n_row, n_col, etas.shape[0]), dtype=np.float32) - sampled_risk_max = np.zeros((n_row, n_col, etas.shape[0]), dtype=np.float32) + sampled_risk_min = np.zeros((n_row, n_col, nbr_etas), dtype=np.float32) + sampled_risk_max = np.zeros((n_row, n_col, nbr_etas), dtype=np.float32) for row in prange(n_row): # pylint: disable=not-an-iterable for col in prange(n_col): # pylint: disable=not-an-iterable @@ -278,24 +308,33 @@ def compute_risk_and_sampled_risk( risk_max[row, col] = np.nan risk_min[row, col] = np.nan else: - normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * etas.shape[0]) + normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * nbr_etas) # Normalized cost volume for one point normalized_cv = (cv[row, col, :] - min_cost) / (max_cost - min_cost) - #  Mask nan to -inf to later discard values out of [min; min + eta] - normalized_cv[np.isnan(normalized_cv)] = -np.inf - normalized_cv = np.repeat(normalized_cv, etas.shape[0]) - # Initialize all disparties - disp_cv = np.arange(nb_disps) * 1.0 - disp_cv = np.repeat(disp_cv, etas.shape[0]) - # Remove disparities for every similarity value outside of [min;min+eta[ + # Mask nan to -inf to later discard values out of [min; min + eta] + idx_disp_min = np.searchsorted(disparity_range, grids[0][row, col]) + idx_disp_max = np.searchsorted(disparity_range, grids[1][row, col]) + 1 + + normalized_cv[idx_disp_min:idx_disp_max][ + np.isnan(normalized_cv[idx_disp_min:idx_disp_max]) + ] = -np.inf + normalized_cv[:idx_disp_min][np.isnan(normalized_cv[:idx_disp_min])] = np.nan + normalized_cv[idx_disp_max:][np.isnan(normalized_cv[idx_disp_max:])] = np.nan + + normalized_cv = np.repeat(normalized_cv, nbr_etas) + + # Initialize all disparities + disp_cv = np.arange(nb_disps, dtype=np.float32) + disp_cv = np.repeat(disp_cv, nbr_etas) + # Remove disparities for every similarity value outside [min;min+eta[ disp_cv[normalized_cv > (normalized_min_cost + two_dim_etas)] = np.nan # Reshape to distinguish each sample's disparity range - disp_cv = disp_cv.reshape((nb_disps, etas.shape[0])) + disp_cv = disp_cv.reshape((nb_disps, nbr_etas)) # Initialize minimum and maximum disparities - min_disp = np.zeros(etas.shape[0]) - max_disp = np.zeros(etas.shape[0]) + min_disp = np.zeros(nbr_etas) + max_disp = np.zeros(nbr_etas) # Obtain min and max disparities for each sample - for i in range(etas.shape[0]): + for i in range(nbr_etas): min_disp[i] = np.nanmin(disp_cv[:, i]) max_disp[i] = np.nanmax(disp_cv[:, i]) # fill sampled max risk diff --git a/tests/test_confidence.py b/tests/test_confidence.py deleted file mode 100644 index 7e2b3a7..0000000 --- a/tests/test_confidence.py +++ /dev/null @@ -1,836 +0,0 @@ -# type:ignore -#!/usr/bin/env python -# coding: utf8 -# -# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). -# -# This file is part of PANDORA -# -# https://github.com/CNES/Pandora_pandora -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -""" -This module contains functions to test the confidence module. -""" - -import unittest - -import numpy as np -import xarray as xr -from rasterio import Affine - -import pandora -import pandora.cost_volume_confidence as confidence -from pandora import matching_cost -from pandora.state_machine import PandoraMachine -from pandora.img_tools import add_disparity -from pandora.criteria import validity_mask - - -class TestConfidence(unittest.TestCase): - """ - TestConfidence class allows to test the confidence module - """ - - def setUp(self): - """ - Method called to prepare the test fixture - - """ - - @staticmethod - def test_ambiguity(): - """ - Test the ambiguity method - - """ - cv_ = np.array( - [[[np.nan, 1, 3], [4, 1, 1], [1.2, 1, 2]], [[5, np.nan, np.nan], [6.2, np.nan, np.nan], [0, np.nan, 0]]], - dtype=np.float32, - ) - - cv_ = xr.Dataset( - {"cost_volume": (["row", "col", "disp"], cv_)}, coords={"row": [0, 1], "col": [0, 1, 2], "disp": [-1, 0, 1]} - ) - - ambiguity_ = confidence.AbstractCostVolumeConfidence( - **{"confidence_method": "ambiguity", "eta_max": 0.2, "eta_step": 0.1} - ) - - # Apply median filter to the disparity map. Median filter is only applied on valid pixels. - ambiguity_.confidence_prediction(None, None, None, cv_) - - # Ambiguity integral not normalized - amb_int = np.array([[4.0, 4.0, 3.0], [6.0, 6.0, 6.0]]) # pylint: disable=unused-variable - # Normalized ambiguity - amb_int = np.array([[(4.0 - 3.05) / (6.0 - 3.05), (4.0 - 3.05) / (6.0 - 3.05), 0], [1.0, 1.0, 1.0]]) - #  Ambiguity to confidence measure - ambiguity_ground_truth = 1 - amb_int - - # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(cv_["confidence_measure"].data[:, :, 0], ambiguity_ground_truth, rtol=1e-06) - - @staticmethod - def test_ambiguity_without_normalization(): - """ - Test the ambiguity method - - """ - cv_ = np.array( - [[[np.nan, 1, 3], [4, 1, 1], [1.2, 1, 2]], [[5, np.nan, np.nan], [6.2, np.nan, np.nan], [0, np.nan, 0]]], - dtype=np.float32, - ) - - cv_ = xr.Dataset( - {"cost_volume": (["row", "col", "disp"], cv_)}, coords={"row": [0, 1], "col": [0, 1, 2], "disp": [-1, 0, 1]} - ) - - ambiguity_ = confidence.AbstractCostVolumeConfidence( - **{"confidence_method": "ambiguity", "eta_max": 0.2, "eta_step": 0.1, "normalization": False} - ) - - # Apply median filter to the disparity map. Median filter is only applied on valid pixels. - ambiguity_.confidence_prediction(None, None, None, cv_) - - # Ambiguity integral not normalized - amb_int_not_normalized = np.array([[4.0, 4.0, 3.0], [6.0, 6.0, 6.0]]) - #  Ambiguity to confidence measure - ambiguity_ground_truth = 1 - amb_int_not_normalized - - # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(cv_["confidence_measure"].data[:, :, 0], ambiguity_ground_truth, rtol=1e-06) - - @staticmethod - def test_ambiguity_std_full_pipeline(): - """ - Test the ambiguity and std_intensity methods using the pandora run method - - """ - # Create left and right images - left_data = np.array([[2, 5, 3, 1], [5, 3, 2, 1], [4, 2, 3, 2], [4, 5, 3, 2]], dtype=np.float32) - mask_ = np.array([[0, 0, 0, 0], [0, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 1]], dtype=np.int16) - left_im = xr.Dataset( - {"im": (["row", "col"], left_data), "msk": (["row", "col"], mask_)}, - coords={"row": np.arange(left_data.shape[0]), "col": np.arange(left_data.shape[1])}, - ) - # Add image conf to the image dataset - left_im.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - left_im.pipe(add_disparity, disparity=[-1, 1], window=None) - - right_data = np.array([[1, 2, 1, 2], [2, 3, 5, 3], [0, 2, 4, 2], [5, 3, 1, 4]], dtype=np.float32) - mask_ = np.array([[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], dtype=np.int16) - right_im = xr.Dataset( - {"im": (["row", "col"], right_data), "msk": (["row", "col"], mask_)}, - coords={"row": np.arange(right_data.shape[0]), "col": np.arange(right_data.shape[1])}, - ) - # Add image conf to the image dataset - right_im.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - right_im.pipe(add_disparity, disparity=[-1, 1], window=None) - - user_cfg = { - "input": {"disp_left": (-1, 1)}, - "pipeline": { - "matching_cost": {"matching_cost_method": "sad", "window_size": 1, "subpix": 1}, - "cost_volume_confidence": {"confidence_method": "std_intensity"}, - "cost_volume_confidence.2": {"confidence_method": "ambiguity", "eta_max": 0.3, "eta_step": 0.25}, - "disparity": {"disparity_method": "wta"}, - "filter": {"filter_method": "median"}, - }, - } - pandora_machine = PandoraMachine() - - # Update the user configuration with default values - cfg = pandora.check_configuration.update_conf(pandora.check_configuration.default_short_configuration, user_cfg) - - # Run the pandora pipeline - left, _ = pandora.run(pandora_machine, left_im, right_im, cfg) - - assert ( - np.sum(left.coords["indicator"].data != ["confidence_from_intensity_std", "confidence_from_ambiguity.2"]) - == 0 - ) - - # ----- Check ambiguity results ------ - - # Cost volume Normalized cost volume - # [[[nan 1. 0.] [[[ nan 0.25 0. ] - # [ 4. 3. 4.] [1. 0.75 1. ] - # [ 1. 2. 1.] [0.25 0.5 0.25] - # [ 0. 1. nan]] [0. 0.25 nan]] - # [[nan 3. 2.] [[ nan 0.75 0.5 ] - # [nan nan nan] [ nan nan nan] - # [ 1. 3. 1.] [0.25 0.75 0.25] - # [ 4. 2. nan]] [1. 0.5 nan]] - # [[nan 4. 2.] [[ nan 1. 0.5 ] - # [ 2. 0. 2.] [0.5 0. 0.5 ] - # [ 1. 1. 1.] [0.25 0.25 0.25] - # [ 2. 0. nan]] [0.5 0. nan]] - # [[nan 1. 1.] [[ nan 0.25 0.25] - # [ 0. 2. 4.] [0. 0.5 1. ] - # [ 0. 2. 1.] [0. 0.5 0.25] - # [nan nan nan]]] [ nan nan nan]]] - # - # Ambiguity integral not normalized - amb_int = np.array([[5.0, 4.0, 5.0, 5.0], [5.0, 6.0, 4.0, 4.0], [4.0, 2.0, 6.0, 4.0], [6.0, 2.0, 3.0, 6.0]]) - # Normalized ambiguity - amb_int = np.array( - [ - [(5.0 - 2) / 4.0, (4.0 - 2) / 4.0, (5.0 - 2) / 4.0, (5.0 - 2) / 4.0], - [(5.0 - 2) / 4.0, (6.0 - 2) / 4.0, (4.0 - 2) / 4.0, (4.0 - 2) / 4.0], - [(4.0 - 2) / 4.0, (2.0 - 2) / 4.0, (6.0 - 2) / 4.0, (4.0 - 2) / 4.0], - [(6.0 - 2) / 4.0, (2.0 - 2) / 4.0, (3.0 - 2) / 4.0, (6.0 - 2) / 4.0], - ] - ) - #  Ambiguity to confidence measure - ambiguity_ground_truth = 1 - amb_int - - # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(left["confidence_measure"].data[:, :, 1], ambiguity_ground_truth, rtol=1e-06) - - # ----- Check std_intensity results ------ - std_intensity_gt = np.array( - [[0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0]] - ) - # Check if the calculated std_intensity is equal to the ground truth (same shape and all elements equals) - np.testing.assert_array_equal(left["confidence_measure"].data[:, :, 0], std_intensity_gt) - - @staticmethod - def test_non_normalized_ambiguity_std_full_pipeline(): - """ - Test the non normalized ambiguity and std_intensity methods using the pandora run method - - """ - # Create left and right images - left_data = np.array([[2, 5, 3, 1], [5, 3, 2, 1], [4, 2, 3, 2], [4, 5, 3, 2]], dtype=np.float32) - mask_ = np.array([[0, 0, 0, 0], [0, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 1]], dtype=np.int16) - left_im = xr.Dataset( - {"im": (["row", "col"], left_data), "msk": (["row", "col"], mask_)}, - coords={"row": np.arange(left_data.shape[0]), "col": np.arange(left_data.shape[1])}, - ) - # Add image conf to the image dataset - left_im.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - left_im.pipe(add_disparity, disparity=[-1, 1], window=None) - - right_data = np.array([[1, 2, 1, 2], [2, 3, 5, 3], [0, 2, 4, 2], [5, 3, 1, 4]], dtype=np.float32) - mask_ = np.array([[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], dtype=np.int16) - right_im = xr.Dataset( - {"im": (["row", "col"], right_data), "msk": (["row", "col"], mask_)}, - coords={"row": np.arange(right_data.shape[0]), "col": np.arange(right_data.shape[1])}, - ) - # Add image conf to the image dataset - right_im.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - right_im.pipe(add_disparity, disparity=[-1, 1], window=None) - - user_cfg = { - "input": {"disp_left": (-1, 1)}, - "pipeline": { - "matching_cost": {"matching_cost_method": "sad", "window_size": 1, "subpix": 1}, - "cost_volume_confidence": {"confidence_method": "std_intensity"}, - "cost_volume_confidence.2": { - "confidence_method": "ambiguity", - "eta_max": 0.3, - "eta_step": 0.25, - "normalization": False, - }, - "disparity": {"disparity_method": "wta"}, - "filter": {"filter_method": "median"}, - }, - } - pandora_machine = PandoraMachine() - - # Update the user configuration with default values - cfg = pandora.check_configuration.update_conf(pandora.check_configuration.default_short_configuration, user_cfg) - - # Run the pandora pipeline - left, _ = pandora.run(pandora_machine, left_im, right_im, cfg) - - assert ( - np.sum(left.coords["indicator"].data != ["confidence_from_intensity_std", "confidence_from_ambiguity.2"]) - == 0 - ) - - # ----- Check ambiguity results ------ - - # Cost volume Normalized cost volume - # [[[nan 1. 0.] [[[ nan 0.25 0. ] - # [ 4. 3. 4.] [1. 0.75 1. ] - # [ 1. 2. 1.] [0.25 0.5 0.25] - # [ 0. 1. nan]] [0. 0.25 nan]] - # [[nan 3. 2.] [[ nan 0.75 0.5 ] - # [nan nan nan] [ nan nan nan] - # [ 1. 3. 1.] [0.25 0.75 0.25] - # [ 4. 2. nan]] [1. 0.5 nan]] - # [[nan 4. 2.] [[ nan 1. 0.5 ] - # [ 2. 0. 2.] [0.5 0. 0.5 ] - # [ 1. 1. 1.] [0.25 0.25 0.25] - # [ 2. 0. nan]] [0.5 0. nan]] - # [[nan 1. 1.] [[ nan 0.25 0.25] - # [ 0. 2. 4.] [0. 0.5 1. ] - # [ 0. 2. 1.] [0. 0.5 0.25] - # [nan nan nan]]] [ nan nan nan]]] - # - # Ambiguity integral not normalized - amb_int_not_normalized = np.array( - [[5.0, 4.0, 5.0, 5.0], [5.0, 6.0, 4.0, 4.0], [4.0, 2.0, 6.0, 4.0], [6.0, 2.0, 3.0, 6.0]] - ) - - #  Ambiguity to confidence measure - ambiguity_ground_truth = 1 - amb_int_not_normalized - - # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(left["confidence_measure"].data[:, :, 1], ambiguity_ground_truth, rtol=1e-06) - - # ----- Check std_intensity results ------ - std_intensity_gt = np.array( - [[0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0]] - ) - # Check if the calculated std_intensity is equal to the ground truth (same shape and all elements equals) - np.testing.assert_array_equal(left["confidence_measure"].data[:, :, 0], std_intensity_gt) - - @staticmethod - def test_std_intensity(): - """ - Test the confidence measure std_intensity - """ - # Create a stereo object - left_data = np.array( - ([1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 2, 1], [1, 1, 1, 4, 3, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]), - dtype=np.float32, - ) - left = xr.Dataset( - {"im": (["row", "col"], left_data)}, - coords={"row": np.arange(left_data.shape[0]), "col": np.arange(left_data.shape[1])}, - ) - left.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - left.pipe(add_disparity, disparity=[-2, 1], window=None) - - right_data = np.array( - ([1, 1, 1, 2, 2, 2], [1, 1, 1, 4, 2, 4], [1, 1, 1, 4, 4, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]), - dtype=np.float64, - ) - right = xr.Dataset( - {"im": (["row", "col"], right_data)}, - coords={"row": np.arange(right_data.shape[0]), "col": np.arange(right_data.shape[1])}, - ) - right.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - - # create matching_cost object - stereo_matcher = matching_cost.AbstractMatchingCost( - **{"matching_cost_method": "sad", "window_size": 3, "subpix": 1} - ) - - # Compute bright standard deviation inside a window of size 3 and create the confidence measure - std_bright_ground_truth = np.array( - [ - [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], - [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(10 / 9), np.sqrt(10 / 9), np.nan], - [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(10 / 9), np.sqrt(10 / 9), np.nan], - [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(92 / 81), np.sqrt(92 / 81), np.nan], - [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], - ], - dtype=np.float32, - ) - - std_bright_ground_truth = std_bright_ground_truth.reshape((5, 6, 1)) - - # compute with compute_cost_volume - grid = stereo_matcher.allocate_cost_volume( - left, (left["disparity"].sel(band_disp="min"), left["disparity"].sel(band_disp="max")) - ) - - # Compute validity mask - grid = validity_mask(left, right, grid) - - cv = stereo_matcher.compute_cost_volume(img_left=left, img_right=right, cost_volume=grid) - stereo_matcher.cv_masked( - left, - right, - cv, - left["disparity"].sel(band_disp="min"), - left["disparity"].sel(band_disp="max"), - ) - - std_intensity = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "std_intensity"}) - - # Compute confidence prediction - _, cv_with_intensity = std_intensity.confidence_prediction(None, left, right, cv) - - # Check if the calculated confidence_measure is equal to the ground truth (same shape and all elements equals) - assert np.sum(cv_with_intensity.coords["indicator"].data != ["confidence_from_intensity_std"]) == 0 - np.testing.assert_array_equal(cv_with_intensity["confidence_measure"].data, std_bright_ground_truth) - - @staticmethod - def test_std_intensity_multiband(): - """ - Test the confidence measure std_intensity with multiband input images - """ - # Create a stereo object - left_data = np.array( - [ - [[0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 2, 1], [0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0]], - [[1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 2, 1], [1, 1, 1, 4, 3, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]], - ], - dtype=np.float32, - ) - left = xr.Dataset( - {"im": (["band_im", "row", "col"], left_data)}, - coords={ - "band_im": ["r", "g"], - "row": np.arange(left_data.shape[1]), - "col": np.arange(left_data.shape[2]), - }, - ) - left.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - left.pipe(add_disparity, disparity=[-2, 1], window=None) - - right_data = np.array( - [ - [[0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 2, 1], [0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0]], - [[1, 1, 1, 2, 2, 2], [1, 1, 1, 4, 2, 4], [1, 1, 1, 4, 4, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]], - ], - dtype=np.float64, - ) - right = xr.Dataset( - {"im": (["band_im", "row", "col"], right_data)}, - coords={ - "band_im": ["r", "g"], - "row": np.arange(right_data.shape[1]), - "col": np.arange(right_data.shape[2]), - }, - ) - - right.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - - # create matching_cost object - stereo_matcher = matching_cost.AbstractMatchingCost( - **{"matching_cost_method": "sad", "window_size": 3, "subpix": 1, "band": "g"} - ) - - # Compute bright standard deviation inside a window of size 3 and create the confidence measure - std_bright_ground_truth = np.array( - [ - [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], - [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(10 / 9), np.sqrt(10 / 9), np.nan], - [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(10 / 9), np.sqrt(10 / 9), np.nan], - [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(92 / 81), np.sqrt(92 / 81), np.nan], - [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], - ], - dtype=np.float32, - ) - - std_bright_ground_truth = std_bright_ground_truth.reshape((5, 6, 1)) - - # compute with compute_cost_volume - grid = stereo_matcher.allocate_cost_volume( - left, (left["disparity"].sel(band_disp="min"), left["disparity"].sel(band_disp="max")) - ) - - # Compute validity mask - grid = validity_mask(left, right, grid) - - cv = stereo_matcher.compute_cost_volume(img_left=left, img_right=right, cost_volume=grid) - stereo_matcher.cv_masked( - left, - right, - cv, - left["disparity"].sel(band_disp="min"), - left["disparity"].sel(band_disp="max"), - ) - - std_intensity = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "std_intensity"}) - - # Compute confidence prediction - _, cv_with_intensity = std_intensity.confidence_prediction(None, left, right, cv) - - # Check if the calculated confidence_measure is equal to the ground truth (same shape and all elements equals) - assert np.sum(cv_with_intensity.coords["indicator"].data != ["confidence_from_intensity_std"]) == 0 - np.testing.assert_array_equal(cv_with_intensity["confidence_measure"].data, std_bright_ground_truth) - - @staticmethod - def test_compute_ambiguity_and_sampled_ambiguity(): - """ - Test ambiguity and sampled ambiguity - - """ - cv_ = np.array( - [ - [[np.nan, 1, 3], [4, 1, 1], [np.nan, np.nan, np.nan]], - [[5, np.nan, np.nan], [6.2, np.nan, np.nan], [0, np.nan, 0]], - ], - dtype=np.float32, - ) - - ambiguity_ = confidence.AbstractCostVolumeConfidence( - **{"confidence_method": "ambiguity", "eta_max": 0.2, "eta_step": 0.1} - ) - - amb, sampled_amb = ambiguity_.compute_ambiguity_and_sampled_ambiguity(cv_, 0.0, 0.2, 0.1) - - # Ambiguity integral - gt_amb_int = np.array([[4.0, 4.0, 6.0], [6.0, 6.0, 6.0]]) - - # Sampled ambiguity - gt_sam_amb = np.array([[[2, 2], [2, 2], [3, 3]], [[3, 3], [3, 3], [3, 3]]], dtype=np.float32) - - # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(amb, gt_amb_int, rtol=1e-06) - np.testing.assert_allclose(sampled_amb, gt_sam_amb, rtol=1e-06) - - @staticmethod - def test_compute_risk(): - """ - Test the compute_risk method - - """ - risk_ = confidence.AbstractCostVolumeConfidence( - **{"confidence_method": "risk", "eta_max": 0.5, "eta_step": 0.3} - ) - cv_ = np.array( - [ - [ - [39, 28, 28, 34.5], - [49, 34, 41.5, 34], - [np.nan, np.nan, np.nan, np.nan], - ] - ], - dtype=np.float32, - ) - - sampled_ambiguity = np.array([[[2.0, 2.0], [2.0, 2.0], [4.0, 4.0]]], dtype=np.float32) - max_cost = 49 - min_cost = 28 - normalized_min_cost = [ # pylint:disable=unused-variable - (28 - min_cost) / (max_cost - min_cost), - (34 - min_cost) / (max_cost - min_cost), - np.nan, - ] - normalized_cv = [ # pylint:disable=unused-variable - [ - (39 - min_cost) / (max_cost - min_cost), - (28.01 - min_cost) / (max_cost - min_cost), - (28 - min_cost) / (max_cost - min_cost), - (34.5 - min_cost) / (max_cost - min_cost), - ], - [ - (49 - min_cost) / (max_cost - min_cost), - (41.5 - min_cost) / (max_cost - min_cost), - (34.1 - min_cost) / (max_cost - min_cost), - (34 - min_cost) / (max_cost - min_cost), - ], - [np.nan, np.nan, np.nan, np.nan], - ] - - # invalidate similarity values outside of [min;min+eta[ - masked_normalized_cv = [ # pylint:disable=unused-variable - [np.nan, (28.01 - min_cost) / (max_cost - min_cost), (28 - min_cost) / (max_cost - min_cost), np.nan], - [np.nan, (28.01 - min_cost) / (max_cost - min_cost), (28 - min_cost) / (max_cost - min_cost), np.nan], - [np.nan, (34.1 - min_cost) / (max_cost - min_cost), np.nan, (34 - min_cost) / (max_cost - min_cost)], - [np.nan, (34.1 - min_cost) / (max_cost - min_cost), np.nan, (34 - min_cost) / (max_cost - min_cost)], - [np.nan, np.nan, np.nan, np.nan], - ] - - disparities = [ # pylint:disable=unused-variable - [np.nan, 1, 2, np.nan], - [np.nan, 1, 2, np.nan], - [np.nan, 1, np.nan, 3], - [np.nan, 1, np.nan, 3], - [np.nan, np.nan, np.nan, np.nan], - [np.nan, np.nan, np.nan, np.nan], - ] - # Risk mas is defined as risk(p,k) = mean(max(di) - min(di)) for di in [cmin(p);cmin(p)+kŋ[ - gt_risk_max = [[((2 - 1) + (2 - 1)) / 2, ((3 - 1) + (3 - 1)) / 2, np.nan]] - # Risk min is defined as mean( (1+risk(p,k)) - amb(p,k) ) - gt_risk_min = [ - [ - (((1 + 1) - sampled_ambiguity[0][0][0]) + ((1 + 1) - sampled_ambiguity[0][0][1])) / 2, - (((1 + 2) - sampled_ambiguity[0][1][0]) + ((1 + 2) - sampled_ambiguity[0][1][1])) / 2, - np.nan, - ] - ] - - # Compute risk - risk_max, risk_min = risk_.compute_risk(cv_, sampled_ambiguity, 0.0, 0.5, 0.3) - - # Check if the calculated risks are equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(gt_risk_max, risk_max, rtol=1e-06) - np.testing.assert_allclose(gt_risk_min, risk_min, rtol=1e-06) - - @staticmethod - def test_compute_risk_and_sampled_risk(): - """ - Test the compute_risk_and_sampled_risk method - - """ - risk_ = confidence.AbstractCostVolumeConfidence( - **{"confidence_method": "risk", "eta_max": 0.5, "eta_step": 0.3} - ) - cv_ = np.array( - [ - [ - [39, 28, 28, 34.5], - [49, 34, 41.5, 34], - [np.nan, np.nan, np.nan, np.nan], - ] - ], - dtype=np.float32, - ) - - sampled_ambiguity = np.array([[[2.0, 2.0], [2.0, 2.0], [4.0, 4.0]]], dtype=np.float32) - max_cost = 49 - min_cost = 28 - normalized_min_cost = [ # pylint:disable=unused-variable - (28 - min_cost) / (max_cost - min_cost), - (34 - min_cost) / (max_cost - min_cost), - np.nan, - ] - normalized_cv = [ # pylint:disable=unused-variable - [ - (39 - min_cost) / (max_cost - min_cost), - (28.01 - min_cost) / (max_cost - min_cost), - (28 - min_cost) / (max_cost - min_cost), - (34.5 - min_cost) / (max_cost - min_cost), - ], - [ - (49 - min_cost) / (max_cost - min_cost), - (41.5 - min_cost) / (max_cost - min_cost), - (34.1 - min_cost) / (max_cost - min_cost), - (34 - min_cost) / (max_cost - min_cost), - ], - [np.nan, np.nan, np.nan, np.nan], - ] - - # invalidate similarity values outside of [min;min+eta[ - masked_normalized_cv = [ # pylint:disable=unused-variable - [np.nan, (28.01 - min_cost) / (max_cost - min_cost), (28 - min_cost) / (max_cost - min_cost), np.nan], - [np.nan, (28.01 - min_cost) / (max_cost - min_cost), (28 - min_cost) / (max_cost - min_cost), np.nan], - [np.nan, (34.1 - min_cost) / (max_cost - min_cost), np.nan, (34 - min_cost) / (max_cost - min_cost)], - [np.nan, (34.1 - min_cost) / (max_cost - min_cost), np.nan, (34 - min_cost) / (max_cost - min_cost)], - [np.nan, np.nan, np.nan, np.nan], - ] - - disparities = [ # pylint:disable=unused-variable - [np.nan, 1, 2, np.nan], - [np.nan, 1, 2, np.nan], - [np.nan, 1, np.nan, 3], - [np.nan, 1, np.nan, 3], - [np.nan, np.nan, np.nan, np.nan], - [np.nan, np.nan, np.nan, np.nan], - ] - # Risk max is defined as risk(p,k) = mean(max(di) - min(di)) for di in [cmin(p);cmin(p)+kŋ[ - gt_risk_max = [[((2 - 1) + (2 - 1)) / 2, ((3 - 1) + (3 - 1)) / 2, np.nan]] - # Risk min is defined as mean( (1+risk(p,k)) - amb(p,k) ) - gt_risk_min = [ - [ - (((1 + 1) - sampled_ambiguity[0][0][0]) + ((1 + 1) - sampled_ambiguity[0][0][1])) / 2, - (((1 + 2) - sampled_ambiguity[0][1][0]) + ((1 + 2) - sampled_ambiguity[0][1][1])) / 2, - np.nan, - ] - ] - - gt_sampled_risk_max = [[[(2 - 1), (2 - 1)], [(3 - 1), (3 - 1)], [np.nan, np.nan]]] - gt_sampled_risk_min = [ - [ - [(1 + 1) - sampled_ambiguity[0][0][0], (1 + 1) - sampled_ambiguity[0][0][1]], - [(1 + 2) - sampled_ambiguity[0][1][0], (1 + 2) - sampled_ambiguity[0][1][1]], - [np.nan, np.nan], - ] - ] - - # Compute risk - risk_max, risk_min, sampled_risk_max, sampled_risk_min = risk_.compute_risk_and_sampled_risk( - cv_, sampled_ambiguity, 0.0, 0.5, 0.3 - ) - - # Check if the calculated risks are equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(gt_risk_max, risk_max, rtol=1e-06) - np.testing.assert_allclose(gt_risk_min, risk_min, rtol=1e-06) - # Check if the calculated sampled risks are equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(gt_sampled_risk_max, sampled_risk_max, rtol=1e-06) - np.testing.assert_allclose(gt_sampled_risk_min, sampled_risk_min, rtol=1e-06) - - @staticmethod - def test_interval_bounds(): - """ - Test the interval bounds method using the pandora run method - - """ - # Create left and right images - left_im = np.array([[2, 5, 3, 1], [5, 3, 2, 1], [4, 2, 3, 2], [4, 5, 3, 2]], dtype=np.float32) - - mask_ = np.array([[0, 0, 0, 0], [0, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 1]], dtype=np.int16) - - left_im = xr.Dataset( - {"im": (["row", "col"], left_im), "msk": (["row", "col"], mask_)}, - coords={"row": np.arange(left_im.shape[0]), "col": np.arange(left_im.shape[1])}, - ) - # Add image conf to the image dataset - - left_im.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - left_im.pipe(add_disparity, disparity=[-1, 1], window=None) - - right_im = np.array([[1, 2, 1, 2], [2, 3, 5, 3], [0, 2, 4, 2], [5, 3, 1, 4]], dtype=np.float32) - - mask_ = np.full((4, 4), 0, dtype=np.int16) - - right_im = xr.Dataset( - {"im": (["row", "col"], right_im), "msk": (["row", "col"], mask_)}, - coords={"row": np.arange(right_im.shape[0]), "col": np.arange(right_im.shape[1])}, - ) - # Add image conf to the image dataset - right_im.attrs = { - "no_data_img": 0, - "valid_pixels": 0, - "no_data_mask": 1, - "crs": None, - "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), - } - right_im.pipe(add_disparity, disparity=[-1, 1], window=None) - - user_cfg = { - "input": {"left": {"disp_min": -1, "disp_max": 1}}, - "pipeline": { - "matching_cost": {"matching_cost_method": "sad", "window_size": 1, "subpix": 1}, - "cost_volume_confidence": {"confidence_method": "interval_bounds", "possibility_threshold": 0.7}, - "disparity": {"disparity_method": "wta"}, - "filter": {"filter_method": "median"}, - }, - } - pandora_machine = PandoraMachine() - - # Update the user configuration with default values - cfg = pandora.check_configuration.update_conf(pandora.check_configuration.default_short_configuration, user_cfg) - - # Run the pandora pipeline - left, _ = pandora.run(pandora_machine, left_im, right_im, cfg) - - assert ( - np.sum( - left.coords["indicator"].data - != [ - "confidence_from_interval_bounds_inf", - "confidence_from_interval_bounds_sup", - ] - ) - == 0 - ) - - # ----- Check interval results ------ - - # Cost volume Normalized cost volume - # [[[nan 1. 0.] [[[ nan 0.25 0. ] - # [ 4. 3. 4.] [1. 0.75 1. ] - # [ 1. 2. 1.] [0.25 0.5 0.25] - # [ 0. 1. nan]] [0. 0.25 nan]] - # [[nan 3. 2.] [[ nan 0.75 0.5 ] - # [nan nan nan] [ nan nan nan] - # [ 1. 3. 1.] [0.25 0.75 0.25] - # [ 4. 2. nan]] [1. 0.5 nan]] - # [[nan 4. 2.] [[ nan 1. 0.5 ] - # [ 2. 0. 2.] [0.5 0. 0.5 ] - # [ 1. 1. 1.] [0.25 0.25 0.25] - # [ 2. 0. nan]] [0.5 0. nan]] - # [[nan 1. 1.] [[ nan 0.25 0.25] - # [ 0. 2. 4.] [0. 0.5 1. ] - # [ 0. 2. 1.] [0. 0.5 0.25] - # [nan nan nan]]] [ nan nan nan]]] - # - # - # - # Possibility - # [[[ nan, 0.75, 1. ], - # [0.75, 1. , 0.75], - # [1. , 0.75, 1. ], - # [1. , 0.75, nan]], - # [[ nan, 0.75, 1. ], - # [ nan, nan, nan], - # [1. , 0.5 , 1. ], - # [0.5 , 1. , nan]], - # [[ nan, 0.5 , 1. ], - # [0.5 , 1. , 0.5 ], - # [1. , 1. , 1. ], - # [0.5 , 1. , nan]], - # [[ nan, 1. , 1. ], - # [1. , 0.5 , 0. ], - # [1. , 0.5 , 0.75], - # [ nan, nan, nan]]] - - # Infimum and supremum bound of the interval confidence - # If the interval bound has a possibility of 1, - # the intervals are extended by one (refinement) - inf_bound_gt = np.array( - [[0, -1, -1, -1], [0, np.nan, -1, -1], [0, -1, -1, -1], [-1, -1, -1, np.nan]], dtype=np.float32 - ) - sup_bound_gt = np.array([[1, 1, 1, 0], [1, np.nan, 1, 1], [1, 1, 1, 1], [1, 0, 1, np.nan]], dtype=np.float32) - - # Check if the calculated intervals is equal to the ground truth (same shape and all elements equals) - np.testing.assert_allclose(left["confidence_measure"].data[:, :, 0], inf_bound_gt, rtol=1e-06) - np.testing.assert_allclose(left["confidence_measure"].data[:, :, 1], sup_bound_gt, rtol=1e-06) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_confidence/conftest.py b/tests/test_confidence/conftest.py new file mode 100644 index 0000000..9afd081 --- /dev/null +++ b/tests/test_confidence/conftest.py @@ -0,0 +1,159 @@ +# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). +# +# This file is part of PANDORA +# +# https://github.com/CNES/Pandora_pandora +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Set of fixtures available to all confidence tests. +""" + +import numpy as np +import pytest +import xarray as xr +from rasterio import Affine + +from pandora.img_tools import add_disparity, add_disparity_grid + + +@pytest.fixture() +def create_img_for_confidence(): + """ + Fixture containing left and right images for confidence tests + """ + + # Create left and right images + left_im = np.array([[2, 5, 3, 1], [5, 3, 2, 1], [4, 2, 3, 2], [4, 5, 3, 2]], dtype=np.float32) + + mask_ = np.array([[0, 0, 0, 0], [0, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 1]], dtype=np.int16) + + left_im = xr.Dataset( + {"im": (["row", "col"], left_im), "msk": (["row", "col"], mask_)}, + coords={"row": np.arange(left_im.shape[0]), "col": np.arange(left_im.shape[1])}, + ) + # Add image conf to the image dataset + + left_im.attrs = { + "no_data_img": 0, + "valid_pixels": 0, + "no_data_mask": 1, + "crs": None, + "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), + } + left_im.pipe(add_disparity, disparity=[-1, 1], window=None) + + right_im = np.array([[1, 2, 1, 2], [2, 3, 5, 3], [0, 2, 4, 2], [5, 3, 1, 4]], dtype=np.float32) + + mask_ = np.full((4, 4), 0, dtype=np.int16) + + right_im = xr.Dataset( + {"im": (["row", "col"], right_im), "msk": (["row", "col"], mask_)}, + coords={"row": np.arange(right_im.shape[0]), "col": np.arange(right_im.shape[1])}, + ) + # Add image conf to the image dataset + right_im.attrs = { + "no_data_img": 0, + "valid_pixels": 0, + "no_data_mask": 1, + "crs": None, + "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), + } + right_im.pipe(add_disparity, disparity=[-1, 1], window=None) + + return left_im, right_im + + +@pytest.fixture() +def create_grids_and_disparity_range_with_variable_disparities(): + """ + Fixture containing grids and disparity range for tests with variable disparities + """ + grids = np.array( + [ + [[-1, 0, -1, 0], [0, -1, 0, -1], [0, 0, 0, -1], [-1, -1, -1, -1]], + [[1, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 0], [0, 0, 0, 1]], + ], + dtype=np.int64, + ) + disparity_range = np.array([-1, 0, 1], dtype=np.float32) + + return grids, disparity_range + + +@pytest.fixture() +def create_cv_for_variable_disparities(): + """ + Fixture containing cv for tests with variable disparities + """ + cv_ = np.array( + [ + [[np.nan, 1, 3, 2], [4, 1, 1, 1], [np.nan, np.nan, np.nan, np.nan], [np.nan, 1, 3, 2]], + [ + [5, np.nan, np.nan, np.nan], + [6.2, np.nan, np.nan, np.nan], + [0, np.nan, 0, 0], + [5, np.nan, np.nan, np.nan], + ], + [[np.nan, 2, 4, 5], [np.nan, 5, 0, 1], [0, 0, 2, np.nan], [np.nan, 2, 4, 5]], + ], + dtype=np.float32, + ) + cv_ = np.rollaxis(cv_, 0, 3) + + return cv_ + + +@pytest.fixture() +def create_images(create_grids_and_disparity_range_with_variable_disparities): + """Make images with a disparity grid.""" + + grids, _ = create_grids_and_disparity_range_with_variable_disparities + + disparity = xr.DataArray(grids, dims=["band_disp", "row", "col"]) + data = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 0, 1, 2], [3, 4, 5, 6]], dtype=np.float64) + left = xr.Dataset( + data_vars={ + "im": (["row", "col"], data), + }, + coords={ + "row": np.arange(data.shape[0]), + "col": np.arange(data.shape[1]), + }, + attrs={ + "valid_pixels": 0, + "no_data_mask": 1, + "crs": None, + "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), + }, + ).pipe(add_disparity_grid, disparity) + + data = np.array([[7, 1, 5, 9], [8, 2, 6, 0], [9, 3, 7, 1], [0, 4, 8, 2]], dtype=np.float64) + right = xr.Dataset( + data_vars={ + "im": (["row", "col"], data), + }, + coords={ + "row": np.arange(data.shape[0]), + "col": np.arange(data.shape[1]), + }, + attrs={ + "valid_pixels": 0, + "no_data_mask": 1, + "crs": None, + "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), + }, + ).pipe(add_disparity, disparity=disparity, window=None) + + return left, right, grids diff --git a/tests/test_confidence/test_ambiguity.py b/tests/test_confidence/test_ambiguity.py new file mode 100644 index 0000000..99331d3 --- /dev/null +++ b/tests/test_confidence/test_ambiguity.py @@ -0,0 +1,196 @@ +# type:ignore +# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). +# +# This file is part of PANDORA +# +# https://github.com/CNES/Pandora_pandora +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module contains functions to test the confidence module with ambiguity. +""" + +import numpy as np +import xarray as xr + +import pandora.cost_volume_confidence as confidence + + +def test_ambiguity(create_img_for_confidence): + """ + Test the ambiguity method + + """ + + left_im, right_im = create_img_for_confidence + + cv_ = np.array( + [[[np.nan, 1, 3], [4, 1, 1], [1.2, 1, 2]], [[5, np.nan, np.nan], [6.2, np.nan, np.nan], [0, np.nan, 0]]], + dtype=np.float32, + ) + + cv_ = xr.Dataset( + {"cost_volume": (["row", "col", "disp"], cv_)}, coords={"row": [0, 1], "col": [0, 1, 2], "disp": [-1, 0, 1]} + ) + + ambiguity_ = confidence.AbstractCostVolumeConfidence( + **{"confidence_method": "ambiguity", "eta_max": 0.2, "eta_step": 0.1} + ) + + # Apply median filter to the disparity map. Median filter is only applied on valid pixels. + ambiguity_.confidence_prediction(None, left_im, right_im, cv_) + + # Ambiguity integral not normalized + _ = np.array([[4.0, 4.0, 3.0], [6.0, 6.0, 6.0]]) + # Normalized ambiguity + amb_int = np.array([[(4.0 - 3.05) / (6.0 - 3.05), (4.0 - 3.05) / (6.0 - 3.05), 0], [1.0, 1.0, 1.0]]) + # Ambiguity to confidence measure + ambiguity_ground_truth = 1 - amb_int + + # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(cv_["confidence_measure"].data[:, :, 0], ambiguity_ground_truth, rtol=1e-06) + + +def test_ambiguity_without_normalization(create_img_for_confidence): + """ + Test the ambiguity method + """ + + left_im, right_im = create_img_for_confidence + + cv_ = np.array( + [[[np.nan, 1, 3], [4, 1, 1], [1.2, 1, 2]], [[5, np.nan, np.nan], [6.2, np.nan, np.nan], [0, np.nan, 0]]], + dtype=np.float32, + ) + + cv_ = xr.Dataset( + {"cost_volume": (["row", "col", "disp"], cv_)}, coords={"row": [0, 1], "col": [0, 1, 2], "disp": [-1, 0, 1]} + ) + + ambiguity_ = confidence.AbstractCostVolumeConfidence( + **{"confidence_method": "ambiguity", "eta_max": 0.2, "eta_step": 0.1, "normalization": False} + ) + + # Apply median filter to the disparity map. Median filter is only applied on valid pixels. + ambiguity_.confidence_prediction(None, left_im, right_im, cv_) + + # Ambiguity integral not normalized + amb_int_not_normalized = np.array([[4.0, 4.0, 3.0], [6.0, 6.0, 6.0]]) + # Ambiguity to confidence measure + ambiguity_ground_truth = 1 - amb_int_not_normalized + + # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(cv_["confidence_measure"].data[:, :, 0], ambiguity_ground_truth, rtol=1e-06) + + +def test_compute_ambiguity_and_sampled_ambiguity(): + """ + Test ambiguity and sampled ambiguity + + """ + cv_ = np.array( + [ + [[np.nan, 1, 3], [4, 1, 1], [np.nan, np.nan, np.nan]], + [[5, np.nan, np.nan], [6.2, np.nan, np.nan], [0, np.nan, 0]], + ], + dtype=np.float32, + ) + + grids = np.array([-1 * np.ones((2, 3)), np.ones((2, 3))], dtype="int64") + disparity_range = np.array([-1, 0, 1], dtype="float32") + + ambiguity_ = confidence.AbstractCostVolumeConfidence( + **{"confidence_method": "ambiguity", "eta_max": 0.2, "eta_step": 0.1} + ) + + etas = np.arange(0.0, 0.2, 0.1) + nbr_etas = etas.shape[0] + + amb, sampled_amb = ambiguity_.compute_ambiguity_and_sampled_ambiguity(cv_, etas, nbr_etas, grids, disparity_range) + + # Ambiguity integral + gt_amb_int = np.array([[4.0, 4.0, 6.0], [6.0, 6.0, 6.0]]) + + # Sampled ambiguity + gt_sam_amb = np.array([[[2, 2], [2, 2], [3, 3]], [[3, 3], [3, 3], [3, 3]]], dtype=np.float32) + + # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(amb, gt_amb_int, rtol=1e-06) + np.testing.assert_allclose(sampled_amb, gt_sam_amb, rtol=1e-06) + + +def test_compute_ambiguity_with_variable_disparity( + create_grids_and_disparity_range_with_variable_disparities, + create_cv_for_variable_disparities, +): + """ + Test ambiguity with variable disparity interval + """ + + grids, disparity_range = create_grids_and_disparity_range_with_variable_disparities + + cv_ = create_cv_for_variable_disparities + + ambiguity_ = confidence.AbstractCostVolumeConfidence( + **{"confidence_method": "ambiguity", "eta_max": 0.2, "eta_step": 0.1} + ) + + etas = np.arange(0.0, 0.2, 0.1) + nbr_etas = etas.shape[0] + + amb = ambiguity_.compute_ambiguity(cv_, etas, nbr_etas, grids, disparity_range) + + # Ambiguity integral + gt_amb_int = np.array([[6.0, 4.0, 4.0, 4.0], [4.0, 4.0, 4.0, 6.0], [4.0, 4.0, 2.0, 4.0], [4.0, 4.0, 4.0, 4.0]]) + + # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(amb, gt_amb_int, rtol=1e-06) + + +def test_compute_compute_ambiguity_and_sampled_ambiguity_with_variable_disparity( + create_grids_and_disparity_range_with_variable_disparities, + create_cv_for_variable_disparities, +): + """ + Test ambiguity with variable disparity interval + """ + + grids, disparity_range = create_grids_and_disparity_range_with_variable_disparities + + cv_ = create_cv_for_variable_disparities + + ambiguity_ = confidence.AbstractCostVolumeConfidence( + **{"confidence_method": "ambiguity", "eta_max": 0.2, "eta_step": 0.1} + ) + + etas = np.arange(0.0, 0.2, 0.1) + nbr_etas = etas.shape[0] + + amb, amb_sampl = ambiguity_.compute_ambiguity_and_sampled_ambiguity(cv_, etas, nbr_etas, grids, disparity_range) + + # Ambiguity integral + gt_amb_int = np.array([[6.0, 4.0, 4.0, 4.0], [4.0, 4.0, 4.0, 6.0], [4.0, 4.0, 2.0, 4.0], [4.0, 4.0, 4.0, 4.0]]) + + gt_sampl_amb = np.array( + [ + [[3.0, 3.0], [2.0, 2.0], [2.0, 2.0], [2.0, 2.0]], + [[2.0, 2.0], [2.0, 2.0], [2.0, 2.0], [3.0, 3.0]], + [[2.0, 2.0], [2.0, 2.0], [1.0, 1.0], [2.0, 2.0]], + [[2.0, 2.0], [2.0, 2.0], [2.0, 2.0], [2.0, 2.0]], + ] + ) + + # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(amb, gt_amb_int, rtol=1e-06) + np.testing.assert_allclose(amb_sampl, gt_sampl_amb, rtol=1e-06) diff --git a/tests/test_confidence/test_interval_bounds.py b/tests/test_confidence/test_interval_bounds.py new file mode 100644 index 0000000..dc67f2e --- /dev/null +++ b/tests/test_confidence/test_interval_bounds.py @@ -0,0 +1,116 @@ +# type:ignore +# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). +# +# This file is part of PANDORA +# +# https://github.com/CNES/Pandora_pandora +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module contains functions to test the confidence module with interval bounds. +""" + +import numpy as np + +import pandora +from pandora.state_machine import PandoraMachine + + +def test_interval_bounds(create_img_for_confidence): + """ + Test the interval bounds method using the pandora run method + + """ + + left_im, right_im = create_img_for_confidence + + user_cfg = { + "input": {"left": {"disp_min": -1, "disp_max": 1}}, + "pipeline": { + "matching_cost": {"matching_cost_method": "sad", "window_size": 1, "subpix": 1}, + "cost_volume_confidence": {"confidence_method": "interval_bounds", "possibility_threshold": 0.7}, + "disparity": {"disparity_method": "wta"}, + "filter": {"filter_method": "median"}, + }, + } + pandora_machine = PandoraMachine() + + # Update the user configuration with default values + cfg = pandora.check_configuration.update_conf(pandora.check_configuration.default_short_configuration, user_cfg) + + # Run the pandora pipeline + left, _ = pandora.run(pandora_machine, left_im, right_im, cfg) + + assert ( + np.sum( + left.coords["indicator"].data + != [ + "confidence_from_interval_bounds_inf", + "confidence_from_interval_bounds_sup", + ] + ) + == 0 + ) + + # ----- Check interval results ------ + + # Cost volume Normalized cost volume + # [[[nan 1. 0.] [[[ nan 0.25 0. ] + # [ 4. 3. 4.] [1. 0.75 1. ] + # [ 1. 2. 1.] [0.25 0.5 0.25] + # [ 0. 1. nan]] [0. 0.25 nan]] + # [[nan 3. 2.] [[ nan 0.75 0.5 ] + # [nan nan nan] [ nan nan nan] + # [ 1. 3. 1.] [0.25 0.75 0.25] + # [ 4. 2. nan]] [1. 0.5 nan]] + # [[nan 4. 2.] [[ nan 1. 0.5 ] + # [ 2. 0. 2.] [0.5 0. 0.5 ] + # [ 1. 1. 1.] [0.25 0.25 0.25] + # [ 2. 0. nan]] [0.5 0. nan]] + # [[nan 1. 1.] [[ nan 0.25 0.25] + # [ 0. 2. 4.] [0. 0.5 1. ] + # [ 0. 2. 1.] [0. 0.5 0.25] + # [nan nan nan]]] [ nan nan nan]]] + # + # + # + # Possibility + # [[[ nan, 0.75, 1. ], + # [0.75, 1. , 0.75], + # [1. , 0.75, 1. ], + # [1. , 0.75, nan]], + # [[ nan, 0.75, 1. ], + # [ nan, nan, nan], + # [1. , 0.5 , 1. ], + # [0.5 , 1. , nan]], + # [[ nan, 0.5 , 1. ], + # [0.5 , 1. , 0.5 ], + # [1. , 1. , 1. ], + # [0.5 , 1. , nan]], + # [[ nan, 1. , 1. ], + # [1. , 0.5 , 0. ], + # [1. , 0.5 , 0.75], + # [ nan, nan, nan]]] + + # Infimum and supremum bound of the interval confidence + # If the interval bound has a possibility of 1, + # the intervals are extended by one (refinement) + inf_bound_gt = np.array( + [[0, -1, -1, -1], [0, np.nan, -1, -1], [0, -1, -1, -1], [-1, -1, -1, np.nan]], dtype=np.float32 + ) + sup_bound_gt = np.array([[1, 1, 1, 0], [1, np.nan, 1, 1], [1, 1, 1, 1], [1, 0, 1, np.nan]], dtype=np.float32) + + # Check if the calculated intervals is equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(left["confidence_measure"].data[:, :, 0], inf_bound_gt, rtol=1e-06) + np.testing.assert_allclose(left["confidence_measure"].data[:, :, 1], sup_bound_gt, rtol=1e-06) diff --git a/tests/test_confidence/test_multiple_confidence.py b/tests/test_confidence/test_multiple_confidence.py new file mode 100644 index 0000000..6462c5c --- /dev/null +++ b/tests/test_confidence/test_multiple_confidence.py @@ -0,0 +1,177 @@ +# type:ignore +# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). +# +# This file is part of PANDORA +# +# https://github.com/CNES/Pandora_pandora +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module contains functions to test the confidence module with mixed measure. +""" + +import numpy as np +import xarray as xr +from rasterio import Affine + +import pandora +from pandora.img_tools import add_disparity +from pandora.state_machine import PandoraMachine + + +def test_ambiguity_std_full_pipeline(create_img_for_confidence): + """ + Test the ambiguity and std_intensity methods using the pandora run method + + """ + left_im, right_im = create_img_for_confidence + + user_cfg = { + "input": {"disp_left": (-1, 1)}, + "pipeline": { + "matching_cost": {"matching_cost_method": "sad", "window_size": 1, "subpix": 1}, + "cost_volume_confidence": {"confidence_method": "std_intensity"}, + "cost_volume_confidence.2": {"confidence_method": "ambiguity", "eta_max": 0.3, "eta_step": 0.25}, + "disparity": {"disparity_method": "wta"}, + "filter": {"filter_method": "median"}, + }, + } + pandora_machine = PandoraMachine() + + # Update the user configuration with default values + cfg = pandora.check_configuration.update_conf(pandora.check_configuration.default_short_configuration, user_cfg) + + # Run the pandora pipeline + left, _ = pandora.run(pandora_machine, left_im, right_im, cfg) + + assert ( + np.sum(left.coords["indicator"].data != ["confidence_from_intensity_std", "confidence_from_ambiguity.2"]) == 0 + ) + + # ----- Check ambiguity results ------ + + # Cost volume Normalized cost volume + # [[[nan 1. 0.] [[[ nan 0.25 0. ] + # [ 4. 3. 4.] [1. 0.75 1. ] + # [ 1. 2. 1.] [0.25 0.5 0.25] + # [ 0. 1. nan]] [0. 0.25 nan]] + # [[nan 3. 2.] [[ nan 0.75 0.5 ] + # [nan nan nan] [ nan nan nan] + # [ 1. 3. 1.] [0.25 0.75 0.25] + # [ 4. 2. nan]] [1. 0.5 nan]] + # [[nan 4. 2.] [[ nan 1. 0.5 ] + # [ 2. 0. 2.] [0.5 0. 0.5 ] + # [ 1. 1. 1.] [0.25 0.25 0.25] + # [ 2. 0. nan]] [0.5 0. nan]] + # [[nan 1. 1.] [[ nan 0.25 0.25] + # [ 0. 2. 4.] [0. 0.5 1. ] + # [ 0. 2. 1.] [0. 0.5 0.25] + # [nan nan nan]]] [ nan nan nan]]] + # + # Ambiguity integral not normalized + amb_int = np.array([[5.0, 4.0, 5.0, 5.0], [5.0, 6.0, 4.0, 4.0], [4.0, 2.0, 6.0, 4.0], [6.0, 2.0, 3.0, 6.0]]) + # Normalized ambiguity + amb_int = np.array( + [ + [(5.0 - 2) / 4.0, (4.0 - 2) / 4.0, (5.0 - 2) / 4.0, (5.0 - 2) / 4.0], + [(5.0 - 2) / 4.0, (6.0 - 2) / 4.0, (4.0 - 2) / 4.0, (4.0 - 2) / 4.0], + [(4.0 - 2) / 4.0, (2.0 - 2) / 4.0, (6.0 - 2) / 4.0, (4.0 - 2) / 4.0], + [(6.0 - 2) / 4.0, (2.0 - 2) / 4.0, (3.0 - 2) / 4.0, (6.0 - 2) / 4.0], + ] + ) + # Ambiguity to confidence measure + ambiguity_ground_truth = 1 - amb_int + + # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(left["confidence_measure"].data[:, :, 1], ambiguity_ground_truth, rtol=1e-06) + + # ----- Check std_intensity results ------ + std_intensity_gt = np.array( + [[0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0]] + ) + # Check if the calculated std_intensity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_array_equal(left["confidence_measure"].data[:, :, 0], std_intensity_gt) + + +def test_non_normalized_ambiguity_std_full_pipeline(create_img_for_confidence): + """ + Test the non normalized ambiguity and std_intensity methods using the pandora run method + + """ + left_im, right_im = create_img_for_confidence + + user_cfg = { + "input": {"disp_left": (-1, 1)}, + "pipeline": { + "matching_cost": {"matching_cost_method": "sad", "window_size": 1, "subpix": 1}, + "cost_volume_confidence": {"confidence_method": "std_intensity"}, + "cost_volume_confidence.2": { + "confidence_method": "ambiguity", + "eta_max": 0.3, + "eta_step": 0.25, + "normalization": False, + }, + "disparity": {"disparity_method": "wta"}, + "filter": {"filter_method": "median"}, + }, + } + pandora_machine = PandoraMachine() + + # Update the user configuration with default values + cfg = pandora.check_configuration.update_conf(pandora.check_configuration.default_short_configuration, user_cfg) + + # Run the pandora pipeline + left, _ = pandora.run(pandora_machine, left_im, right_im, cfg) + + assert ( + np.sum(left.coords["indicator"].data != ["confidence_from_intensity_std", "confidence_from_ambiguity.2"]) == 0 + ) + + # ----- Check ambiguity results ------ + + # Cost volume Normalized cost volume + # [[[nan 1. 0.] [[[ nan 0.25 0. ] + # [ 4. 3. 4.] [1. 0.75 1. ] + # [ 1. 2. 1.] [0.25 0.5 0.25] + # [ 0. 1. nan]] [0. 0.25 nan]] + # [[nan 3. 2.] [[ nan 0.75 0.5 ] + # [nan nan nan] [ nan nan nan] + # [ 1. 3. 1.] [0.25 0.75 0.25] + # [ 4. 2. nan]] [1. 0.5 nan]] + # [[nan 4. 2.] [[ nan 1. 0.5 ] + # [ 2. 0. 2.] [0.5 0. 0.5 ] + # [ 1. 1. 1.] [0.25 0.25 0.25] + # [ 2. 0. nan]] [0.5 0. nan]] + # [[nan 1. 1.] [[ nan 0.25 0.25] + # [ 0. 2. 4.] [0. 0.5 1. ] + # [ 0. 2. 1.] [0. 0.5 0.25] + # [nan nan nan]]] [ nan nan nan]]] + # + # Ambiguity integral not normalized + amb_int_not_normalized = np.array( + [[5.0, 4.0, 5.0, 5.0], [5.0, 6.0, 4.0, 4.0], [4.0, 2.0, 6.0, 4.0], [6.0, 2.0, 3.0, 6.0]] + ) + + # Ambiguity to confidence measure + ambiguity_ground_truth = 1 - amb_int_not_normalized + + # Check if the calculated ambiguity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(left["confidence_measure"].data[:, :, 1], ambiguity_ground_truth, rtol=1e-06) + + # ----- Check std_intensity results ------ + std_intensity_gt = np.array( + [[0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0]] + ) + # Check if the calculated std_intensity is equal to the ground truth (same shape and all elements equals) + np.testing.assert_array_equal(left["confidence_measure"].data[:, :, 0], std_intensity_gt) diff --git a/tests/test_confidence/test_risk.py b/tests/test_confidence/test_risk.py new file mode 100644 index 0000000..411e3ad --- /dev/null +++ b/tests/test_confidence/test_risk.py @@ -0,0 +1,414 @@ +# type:ignore +# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). +# +# This file is part of PANDORA +# +# https://github.com/CNES/Pandora_pandora +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module contains functions to test the confidence module with risk. +""" + +import numpy as np + +import pandora.cost_volume_confidence as confidence +from pandora import matching_cost +from pandora.criteria import validity_mask + + +def test_compute_risk(): + """ + Test the compute_risk method + + """ + risk_ = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "risk", "eta_max": 0.5, "eta_step": 0.3}) + cv_ = np.array( + [ + [ + [39, 28, 28, 34.5], + [49, 34, 41.5, 34], + [np.nan, np.nan, np.nan, np.nan], + ] + ], + dtype=np.float32, + ) + + sampled_ambiguity = np.array([[[2.0, 2.0], [2.0, 2.0], [4.0, 4.0]]], dtype=np.float32) + max_cost = 49 + min_cost = 28 + # normalized_min_cost + _ = [ + (28 - min_cost) / (max_cost - min_cost), + (34 - min_cost) / (max_cost - min_cost), + np.nan, + ] + # normalized_cv + _ = [ + [ + (39 - min_cost) / (max_cost - min_cost), + (28.01 - min_cost) / (max_cost - min_cost), + (28 - min_cost) / (max_cost - min_cost), + (34.5 - min_cost) / (max_cost - min_cost), + ], + [ + (49 - min_cost) / (max_cost - min_cost), + (41.5 - min_cost) / (max_cost - min_cost), + (34.1 - min_cost) / (max_cost - min_cost), + (34 - min_cost) / (max_cost - min_cost), + ], + [np.nan, np.nan, np.nan, np.nan], + ] + + # invalidate similarity values outside [min;min+eta[ + # masked_normalized_cv + _ = [ + [np.nan, (28.01 - min_cost) / (max_cost - min_cost), (28 - min_cost) / (max_cost - min_cost), np.nan], + [np.nan, (28.01 - min_cost) / (max_cost - min_cost), (28 - min_cost) / (max_cost - min_cost), np.nan], + [np.nan, (34.1 - min_cost) / (max_cost - min_cost), np.nan, (34 - min_cost) / (max_cost - min_cost)], + [np.nan, (34.1 - min_cost) / (max_cost - min_cost), np.nan, (34 - min_cost) / (max_cost - min_cost)], + [np.nan, np.nan, np.nan, np.nan], + ] + + # disparities + _ = [ + [np.nan, 1, 2, np.nan], + [np.nan, 1, 2, np.nan], + [np.nan, 1, np.nan, 3], + [np.nan, 1, np.nan, 3], + [np.nan, np.nan, np.nan, np.nan], + [np.nan, np.nan, np.nan, np.nan], + ] + # Risk mas is defined as risk(p,k) = mean(max(di) - min(di)) for di in [cmin(p);cmin(p)+kŋ[ + gt_risk_max = np.array([[((2 - 1) + (2 - 1)) / 2, ((3 - 1) + (3 - 1)) / 2, np.nan]]) + # Risk min is defined as mean( (1+risk(p,k)) - amb(p,k) ) + gt_risk_min = np.array( + [ + [ + (((1 + 1) - sampled_ambiguity[0][0][0]) + ((1 + 1) - sampled_ambiguity[0][0][1])) / 2, + (((1 + 2) - sampled_ambiguity[0][1][0]) + ((1 + 2) - sampled_ambiguity[0][1][1])) / 2, + np.nan, + ] + ] + ) + + grids = np.array([-1 * np.ones((3, 4)), np.ones((3, 4))], dtype="int64") + disparity_range = np.array([-1, 0, 1], dtype="float32") + + etas = np.arange(0.0, 0.5, 0.3) + nbr_etas = etas.shape[0] + + # Compute risk + risk_max, risk_min = risk_.compute_risk(cv_, sampled_ambiguity, etas, nbr_etas, grids, disparity_range) + + # Check if the calculated risks are equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(gt_risk_max, risk_max, rtol=1e-06) + np.testing.assert_allclose(gt_risk_min, risk_min, rtol=1e-06) + + +def test_compute_risk_with_subpix(create_images): + """ + Test the compute_risk method with subpixel disparity interval, non regression test + """ + left, right, grids = create_images + + disparity_range = np.array([-1.0, -0.75, -0.5, -0.25, 0.0, 0.25, 0.5, 0.75, 1.0], dtype=np.float32) + + cfg = {"matching_cost_method": "ssd", "window_size": 1, "subpix": 2} + matching_cost_instance = matching_cost.AbstractMatchingCost(**cfg) + + grid = matching_cost_instance.allocate_cost_volume( + left, (left["disparity"].sel(band_disp="min"), left["disparity"].sel(band_disp="max")), cfg + ) + grid = validity_mask(left, right, grid) + _ = matching_cost_instance.compute_cost_volume(img_left=left, img_right=right, cost_volume=grid) + + # window_size = 1 + cv = np.array( + [ + [ + [np.nan, np.nan, 36.0, 9.0, 0.0], + [25.0, 4.0, 1.0, 1.0, 9.0], + [4.0, 0.0, 4.0, 16.0, 36.0], + [1.0, 9.0, 25.0, np.nan, np.nan], + ], + [ + [np.nan, np.nan, 9.0, 0.0, 9.0], + [4.0, 1.0, 16.0, 4.0, 0.0], + [25.0, 9.0, 1.0, 16.0, 49.0], + [4.0, 25.0, 64.0, np.nan, np.nan], + ], + [ + [np.nan, np.nan, 0.0, 9.0, 36.0], + [81.0, 36.0, 9.0, 25.0, 49.0], + [4.0, 16.0, 36.0, 9.0, 0.0], + [25.0, 4.0, 1.0, np.nan, np.nan], + ], + [ + [np.nan, np.nan, 9.0, 1.0, 1.0], + [16.0, 4.0, 0.0, 4.0, 16.0], + [1.0, 1.0, 9.0, 0.0, 9.0], + [4.0, 1.0, 16.0, np.nan, np.nan], + ], + ], + dtype=np.float32, + ) + + ambiguity_ = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "ambiguity"}) + + etas = np.arange(0.0, 0.7, 0.01).astype(np.float64) + nbr_etas = etas.shape[0] + + _, sampled_ambiguity = ambiguity_.compute_ambiguity_and_sampled_ambiguity( + cv, etas, nbr_etas, grids, disparity_range + ) + + # Compute risk + risk_ = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "risk", "eta_max": 0.7, "eta_step": 0.01}) + + risk_max, risk_min = risk_.compute_risk(cv, sampled_ambiguity, etas, nbr_etas, grids, disparity_range) + + gt_risk_max = np.array( + [ + [4.0, 3.3714285, 2.9285715, 4.0], + [3.8285713, 3.8428571, 2.3, 4.0], + [3.1857142, 1.5142857, 3.7142856, 3.5142858], + [4.0, 3.2857144, 3.7428572, 3.942857], + ], + dtype=np.float32, + ) + gt_risk_min = np.array( + [ + [0.8142857, 0.0, 0.0, 1.5714285], + [2.1714287, 0.3, 0.0, 1.3714286], + [2.0, 0.0, 0.8857143, 0.0], + [0.14285715, 0.0, 0.14285715, 0.27142859], + ], + dtype=np.float32, + ) + + np.testing.assert_allclose(gt_risk_max, risk_max, rtol=1e-06) + np.testing.assert_allclose(gt_risk_min, risk_min, rtol=1e-06) + + +def test_compute_risk_with_variable_disparity( + create_grids_and_disparity_range_with_variable_disparities, create_cv_for_variable_disparities +): + """ + Test the compute_risk method with variable disparity interval + """ + + grids, disparity_range = create_grids_and_disparity_range_with_variable_disparities + + cv_ = create_cv_for_variable_disparities + + amb_sampl = np.array( + [ + [[3.0, 3.0], [2.0, 2.0], [2.0, 2.0], [2.0, 2.0]], + [[2.0, 2.0], [2.0, 2.0], [2.0, 2.0], [3.0, 3.0]], + [[2.0, 2.0], [2.0, 2.0], [1.0, 1.0], [2.0, 2.0]], + [[2.0, 2.0], [2.0, 2.0], [2.0, 2.0], [2.0, 2.0]], + ], + dtype=np.float32, + ) + + risk_ = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "risk", "eta_max": 0.2, "eta_step": 0.1}) + + gt_risk_max = np.array( + [[2.0, 1.5, 1.5, 1.0], [2.0, 1.0, 1.5, 2.0], [2.0, 2.0, 1.0, 2.0], [2.0, 1.5, 1.5, 1.0]], dtype=np.float32 + ) + gt_risk_min = np.array( + [[0.0, 0.5, 0.5, 0.0], [1.0, 0.0, 0.5, 0.0], [1.0, 1.0, 1.0, 1.0], [1.0, 0.5, 0.5, 0.0]], dtype=np.float32 + ) + + etas = np.arange(0.0, 0.5, 0.3) + nbr_etas = etas.shape[0] + + # Compute risk + risk_max, risk_min = risk_.compute_risk(cv_, amb_sampl, etas, nbr_etas, grids, disparity_range) + + # Check if the calculated risks are equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(gt_risk_max, risk_max, rtol=1e-06) + np.testing.assert_allclose(gt_risk_min, risk_min, rtol=1e-06) + + +def test_compute_risk_and_sampled_risk(): + """ + Test the compute_risk_and_sampled_risk method + + """ + risk_ = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "risk", "eta_max": 0.5, "eta_step": 0.3}) + cv_ = np.array( + [ + [ + [39, 28, 28, 34.5], + [49, 34, 41.5, 34], + [np.nan, np.nan, np.nan, np.nan], + ] + ], + dtype=np.float32, + ) + + sampled_ambiguity = np.array([[[2.0, 2.0], [2.0, 2.0], [4.0, 4.0]]], dtype=np.float32) + max_cost = 49 + min_cost = 28 + # normalized_min_cost + _ = [ + (28 - min_cost) / (max_cost - min_cost), + (34 - min_cost) / (max_cost - min_cost), + np.nan, + ] + # normalized_cv + _ = [ + [ + (39 - min_cost) / (max_cost - min_cost), + (28.01 - min_cost) / (max_cost - min_cost), + (28 - min_cost) / (max_cost - min_cost), + (34.5 - min_cost) / (max_cost - min_cost), + ], + [ + (49 - min_cost) / (max_cost - min_cost), + (41.5 - min_cost) / (max_cost - min_cost), + (34.1 - min_cost) / (max_cost - min_cost), + (34 - min_cost) / (max_cost - min_cost), + ], + [np.nan, np.nan, np.nan, np.nan], + ] + + # invalidate similarity values outside [min;min+eta[ + # masked_normalized_cv + _ = [ + [np.nan, (28.01 - min_cost) / (max_cost - min_cost), (28 - min_cost) / (max_cost - min_cost), np.nan], + [np.nan, (28.01 - min_cost) / (max_cost - min_cost), (28 - min_cost) / (max_cost - min_cost), np.nan], + [np.nan, (34.1 - min_cost) / (max_cost - min_cost), np.nan, (34 - min_cost) / (max_cost - min_cost)], + [np.nan, (34.1 - min_cost) / (max_cost - min_cost), np.nan, (34 - min_cost) / (max_cost - min_cost)], + [np.nan, np.nan, np.nan, np.nan], + ] + + # disparities + _ = [ + [np.nan, 1, 2, np.nan], + [np.nan, 1, 2, np.nan], + [np.nan, 1, np.nan, 3], + [np.nan, 1, np.nan, 3], + [np.nan, np.nan, np.nan, np.nan], + [np.nan, np.nan, np.nan, np.nan], + ] + # Risk max is defined as risk(p,k) = mean(max(di) - min(di)) for di in [cmin(p);cmin(p)+kŋ[ + gt_risk_max = np.array([[((2 - 1) + (2 - 1)) / 2, ((3 - 1) + (3 - 1)) / 2, np.nan]]) + # Risk min is defined as mean( (1+risk(p,k)) - amb(p,k) ) + gt_risk_min = np.array( + [ + [ + (((1 + 1) - sampled_ambiguity[0][0][0]) + ((1 + 1) - sampled_ambiguity[0][0][1])) / 2, + (((1 + 2) - sampled_ambiguity[0][1][0]) + ((1 + 2) - sampled_ambiguity[0][1][1])) / 2, + np.nan, + ] + ] + ) + + gt_sampled_risk_max = np.array([[[(2 - 1), (2 - 1)], [(3 - 1), (3 - 1)], [np.nan, np.nan]]]) + gt_sampled_risk_min = np.array( + [ + [ + [(1 + 1) - sampled_ambiguity[0][0][0], (1 + 1) - sampled_ambiguity[0][0][1]], + [(1 + 2) - sampled_ambiguity[0][1][0], (1 + 2) - sampled_ambiguity[0][1][1]], + [np.nan, np.nan], + ] + ] + ) + + grids = np.array([-1 * np.ones((3, 4)), np.ones((3, 4))], dtype="int64") + disparity_range = np.array([-1, 0, 1], dtype="float32") + + etas = np.arange(0.0, 0.5, 0.3) + nbr_etas = etas.shape[0] + + # Compute risk + risk_max, risk_min, sampled_risk_max, sampled_risk_min = risk_.compute_risk_and_sampled_risk( + cv_, sampled_ambiguity, etas, nbr_etas, grids, disparity_range + ) + + # Check if the calculated risks are equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(gt_risk_max, risk_max, rtol=1e-06) + np.testing.assert_allclose(gt_risk_min, risk_min, rtol=1e-06) + # Check if the calculated sampled risks are equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(gt_sampled_risk_max, sampled_risk_max, rtol=1e-06) + np.testing.assert_allclose(gt_sampled_risk_min, sampled_risk_min, rtol=1e-06) + + +def test_compute_risk_and_sampled_risk_with_variable_disparity( + create_grids_and_disparity_range_with_variable_disparities, create_cv_for_variable_disparities +): + """ + Test the compute_risk_and_sampled_risk method with variable disparity interval + + """ + grids, disparity_range = create_grids_and_disparity_range_with_variable_disparities + + cv_ = create_cv_for_variable_disparities + + amb_sampl = np.array( + [ + [[3.0, 3.0], [2.0, 2.0], [2.0, 2.0], [2.0, 2.0]], + [[2.0, 2.0], [2.0, 2.0], [2.0, 2.0], [3.0, 3.0]], + [[2.0, 2.0], [2.0, 2.0], [1.0, 1.0], [2.0, 2.0]], + [[2.0, 2.0], [2.0, 2.0], [2.0, 2.0], [2.0, 2.0]], + ], + dtype=np.float32, + ) + + risk_ = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "risk", "eta_max": 0.2, "eta_step": 0.1}) + + etas = np.arange(0.0, 0.5, 0.3) + nbr_etas = etas.shape[0] + + # Compute risk + risk_max, risk_min, sampled_risk_max, sampled_risk_min = risk_.compute_risk_and_sampled_risk( + cv_, amb_sampl, etas, nbr_etas, grids, disparity_range + ) + + gt_risk_max = np.array( + [[2.0, 1.5, 1.5, 1.0], [2.0, 1.0, 1.5, 2.0], [2.0, 2.0, 1.0, 2.0], [2.0, 1.5, 1.5, 1.0]], dtype=np.float32 + ) + + gt_risk_min = np.array( + [[0.0, 0.5, 0.5, 0.0], [1.0, 0.0, 0.5, 0.0], [1.0, 1.0, 1.0, 1.0], [1.0, 0.5, 0.5, 0.0]], dtype=np.float32 + ) + + gt_sampled_risk_max = np.array( + [ + [[2.0, 2.0], [1.0, 2.0], [1.0, 2.0], [1.0, 1.0]], + [[2.0, 2.0], [1.0, 1.0], [1.0, 2.0], [2.0, 2.0]], + [[2.0, 2.0], [2.0, 2.0], [1.0, 1.0], [2.0, 2.0]], + [[2.0, 2.0], [1.0, 2.0], [1.0, 2.0], [1.0, 1.0]], + ], + dtype=np.float32, + ) + gt_sampled_risk_min = np.array( + [ + [[0.0, 0.0], [0.0, 1.0], [0.0, 1.0], [0.0, 0.0]], + [[1.0, 1.0], [0.0, 0.0], [0.0, 1.0], [0.0, 0.0]], + [[1.0, 1.0], [1.0, 1.0], [1.0, 1.0], [1.0, 1.0]], + [[1.0, 1.0], [0.0, 1.0], [0.0, 1.0], [0.0, 0.0]], + ], + dtype=np.float32, + ) + + # Check if the calculated risks are equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(gt_risk_max, risk_max, rtol=1e-06) + np.testing.assert_allclose(gt_risk_min, risk_min, rtol=1e-06) + # Check if the calculated sampled risks are equal to the ground truth (same shape and all elements equals) + np.testing.assert_allclose(gt_sampled_risk_max, sampled_risk_max, rtol=1e-06) + np.testing.assert_allclose(gt_sampled_risk_min, sampled_risk_min, rtol=1e-06) diff --git a/tests/test_confidence/test_std_intensity.py b/tests/test_confidence/test_std_intensity.py new file mode 100644 index 0000000..12f4c60 --- /dev/null +++ b/tests/test_confidence/test_std_intensity.py @@ -0,0 +1,214 @@ +# type:ignore +# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES). +# +# This file is part of PANDORA +# +# https://github.com/CNES/Pandora_pandora +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module contains functions to test the confidence module for std_intensity. +""" + +import numpy as np +import xarray as xr +from rasterio import Affine + +import pandora.cost_volume_confidence as confidence +from pandora import matching_cost +from pandora.criteria import validity_mask +from pandora.img_tools import add_disparity + + +def test_std_intensity(): + """ + Test the confidence measure std_intensity + """ + # Create a stereo object + left_data = np.array( + ([1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 2, 1], [1, 1, 1, 4, 3, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]), + dtype=np.float32, + ) + left = xr.Dataset( + {"im": (["row", "col"], left_data)}, + coords={"row": np.arange(left_data.shape[0]), "col": np.arange(left_data.shape[1])}, + ) + left.attrs = { + "no_data_img": 0, + "valid_pixels": 0, + "no_data_mask": 1, + "crs": None, + "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), + } + left.pipe(add_disparity, disparity=[-2, 1], window=None) + + right_data = np.array( + ([1, 1, 1, 2, 2, 2], [1, 1, 1, 4, 2, 4], [1, 1, 1, 4, 4, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]), + dtype=np.float64, + ) + right = xr.Dataset( + {"im": (["row", "col"], right_data)}, + coords={"row": np.arange(right_data.shape[0]), "col": np.arange(right_data.shape[1])}, + ) + right.attrs = { + "no_data_img": 0, + "valid_pixels": 0, + "no_data_mask": 1, + "crs": None, + "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), + } + + # create matching_cost object + stereo_matcher = matching_cost.AbstractMatchingCost( + **{"matching_cost_method": "sad", "window_size": 3, "subpix": 1} + ) + + # Compute bright standard deviation inside a window of size 3 and create the confidence measure + std_bright_ground_truth = np.array( + [ + [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], + [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(10 / 9), np.sqrt(10 / 9), np.nan], + [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(10 / 9), np.sqrt(10 / 9), np.nan], + [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(92 / 81), np.sqrt(92 / 81), np.nan], + [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], + ], + dtype=np.float32, + ) + + std_bright_ground_truth = std_bright_ground_truth.reshape((5, 6, 1)) + + # compute with compute_cost_volume + grid = stereo_matcher.allocate_cost_volume( + left, (left["disparity"].sel(band_disp="min"), left["disparity"].sel(band_disp="max")) + ) + + # Compute validity mask + grid = validity_mask(left, right, grid) + + cv = stereo_matcher.compute_cost_volume(img_left=left, img_right=right, cost_volume=grid) + stereo_matcher.cv_masked( + left, + right, + cv, + left["disparity"].sel(band_disp="min"), + left["disparity"].sel(band_disp="max"), + ) + + std_intensity = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "std_intensity"}) + + # Compute confidence prediction + _, cv_with_intensity = std_intensity.confidence_prediction(None, left, right, cv) + + # Check if the calculated confidence_measure is equal to the ground truth (same shape and all elements equals) + assert np.sum(cv_with_intensity.coords["indicator"].data != ["confidence_from_intensity_std"]) == 0 + np.testing.assert_array_equal(cv_with_intensity["confidence_measure"].data, std_bright_ground_truth) + + +def test_std_intensity_multiband(): + """ + Test the confidence measure std_intensity with multiband input images + """ + # Create a stereo object + left_data = np.array( + [ + [[0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 2, 1], [0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0]], + [[1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 2, 1], [1, 1, 1, 4, 3, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]], + ], + dtype=np.float32, + ) + left = xr.Dataset( + {"im": (["band_im", "row", "col"], left_data)}, + coords={ + "band_im": ["r", "g"], + "row": np.arange(left_data.shape[1]), + "col": np.arange(left_data.shape[2]), + }, + ) + left.attrs = { + "no_data_img": 0, + "valid_pixels": 0, + "no_data_mask": 1, + "crs": None, + "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), + } + left.pipe(add_disparity, disparity=[-2, 1], window=None) + + right_data = np.array( + [ + [[0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 2, 1], [0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0]], + [[1, 1, 1, 2, 2, 2], [1, 1, 1, 4, 2, 4], [1, 1, 1, 4, 4, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]], + ], + dtype=np.float64, + ) + right = xr.Dataset( + {"im": (["band_im", "row", "col"], right_data)}, + coords={ + "band_im": ["r", "g"], + "row": np.arange(right_data.shape[1]), + "col": np.arange(right_data.shape[2]), + }, + ) + + right.attrs = { + "no_data_img": 0, + "valid_pixels": 0, + "no_data_mask": 1, + "crs": None, + "transform": Affine(1.0, 0.0, 0.0, 0.0, 1.0, 0.0), + } + + # create matching_cost object + stereo_matcher = matching_cost.AbstractMatchingCost( + **{"matching_cost_method": "sad", "window_size": 3, "subpix": 1, "band": "g"} + ) + + # Compute bright standard deviation inside a window of size 3 and create the confidence measure + std_bright_ground_truth = np.array( + [ + [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], + [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(10 / 9), np.sqrt(10 / 9), np.nan], + [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(10 / 9), np.sqrt(10 / 9), np.nan], + [np.nan, 0.0, np.sqrt(8 / 9), np.sqrt(92 / 81), np.sqrt(92 / 81), np.nan], + [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], + ], + dtype=np.float32, + ) + + std_bright_ground_truth = std_bright_ground_truth.reshape((5, 6, 1)) + + # compute with compute_cost_volume + grid = stereo_matcher.allocate_cost_volume( + left, (left["disparity"].sel(band_disp="min"), left["disparity"].sel(band_disp="max")) + ) + + # Compute validity mask + grid = validity_mask(left, right, grid) + + cv = stereo_matcher.compute_cost_volume(img_left=left, img_right=right, cost_volume=grid) + stereo_matcher.cv_masked( + left, + right, + cv, + left["disparity"].sel(band_disp="min"), + left["disparity"].sel(band_disp="max"), + ) + + std_intensity = confidence.AbstractCostVolumeConfidence(**{"confidence_method": "std_intensity"}) + + # Compute confidence prediction + _, cv_with_intensity = std_intensity.confidence_prediction(None, left, right, cv) + + # Check if the calculated confidence_measure is equal to the ground truth (same shape and all elements equals) + assert np.sum(cv_with_intensity.coords["indicator"].data != ["confidence_from_intensity_std"]) == 0 + np.testing.assert_array_equal(cv_with_intensity["confidence_measure"].data, std_bright_ground_truth)