diff --git a/src/deep_image_matching/image_matching.py b/src/deep_image_matching/image_matching.py index c2f9a33..2066ca9 100644 --- a/src/deep_image_matching/image_matching.py +++ b/src/deep_image_matching/image_matching.py @@ -4,17 +4,20 @@ # Estrarre tutte le features in una volta # Un'immagine viene ruotata solo se per due volte passa il check # Velocizzare il tutto - +import gc import os +import random import shutil from pathlib import Path from pprint import pprint +from PIL import Image import cv2 import h5py import numpy as np import torch from tqdm import tqdm +from typing import List, Optional from . import ( GeometricVerification, @@ -40,62 +43,84 @@ from functools import partial -def resize(img, new_width): - H, W = img.shape[:2] - new_height = int(H * 500 / W) - img = cv2.resize(img, (new_width, new_height)) - img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - return img +def resize(path_to_img: str, new_width: int) -> np.ndarray: + """ + Resize an image to a specified width while maintaining the aspect ratio. + + This function resizes the input image to a new width, and computes the corresponding height + to maintain the original aspect ratio of the image. The resized image is then converted to + grayscale using OpenCV's BGR to grayscale conversion. + + Parameters: + - img (np.ndarray): Input image as a NumPy array with shape (H, W, C) where H is the height, + W is the width, and C is the number of channels (usually 3 for color images). + - new_width (int): Desired width for the resized image. + + Returns: + - np.ndarray: Resized and converted grayscale image as a NumPy array with shape (new_height, + new_width). + """ + img = Image.open(path_to_img).convert("L") + W, H = img.size + new_height = int(H * new_width / W) + resized_img = img.resize((new_width, new_height)) + + return resized_img def find_matches_per_rotation( - path_to_img0, - path_to_img1, - rotations, - cv2_rot_params, - SPextractor, - LGmatcher, + path_to_img0: str, + path_to_img1: str, + rotations: List[int], + cv2_rot_params: List[Optional[int]], + SPextractor: SuperPointExtractor, + LGmatcher: LightGlueMatcher, + resize_size: int, ): - resize_size = 500 features = { "feat0": None, "feat1": None, } - image0 = cv2.imread(path_to_img0) - image0 = resize(image0, resize_size) - image1 = cv2.imread(path_to_img1) - image1 = resize(image1, resize_size) + image0 = np.array(resize(path_to_img0, resize_size)) + _image1 = resize(path_to_img1, resize_size) + image1 = np.array(_image1) features["feat0"] = SPextractor._extract(image0) matchesXrotation = [] for rotation, cv2rotation in zip(rotations, cv2_rot_params): - _image1 = image1 # rotation_matrix = cv2.getRotationMatrix2D((new_width / 2, new_height / 2), rotation, 1.0) # rotated_image = cv2.warpAffine(image, rotation_matrix, (new_width, new_height)) if rotation != 0: - _image1 = cv2.rotate(_image1, cv2rotation) - features["feat1"] = SPextractor._extract(_image1) - matches = LGmatcher._match_pairs(features["feat0"], features["feat1"]) - - F, inlMask = geometric_verification( - features["feat0"]["keypoints"][matches[:, 0], :], - features["feat1"]["keypoints"][matches[:, 1], :], - GeometricVerification.PYDEGENSAC, - threshold=1, - confidence=0.9999, - max_iters=10000, - bool=False, - ) + image1 = np.array(_image1.rotate(cv2rotation, expand=False)) + features["feat1"] = SPextractor._extract(image1) + if ( + features["feat0"]["keypoints"].shape[0] > 8 + and features["feat1"]["keypoints"].shape[0] > 8 + ): + matches = LGmatcher._match_pairs(features["feat0"], features["feat1"]) + + F, inlMask = geometric_verification( + features["feat0"]["keypoints"][matches[:, 0], :], + features["feat1"]["keypoints"][matches[:, 1], :], + GeometricVerification.PYDEGENSAC, + threshold=1, + confidence=0.9999, + max_iters=10000, + bool=False, + ) + + verified_matches = np.sum(inlMask) + # print(matches.shape[0], verified_matches) - verified_matches = np.sum(inlMask) - print(matches.shape[0], verified_matches) + else: + verified_matches = 0 # matchesXrotation.append((rotation, matches.shape[0])) matchesXrotation.append((rotation, verified_matches)) return matchesXrotation -def prova( +def upright( cluster0, path_to_upright_dir, rotations, @@ -103,6 +128,8 @@ def prova( SPextractor, LGmatcher, pairs, + resize_size, + processed_pairs, cluster1, ): processed = [] @@ -111,14 +138,19 @@ def prova( for j, img1 in enumerate(cluster1): # for i, img0 in enumerate(cluster0): for i in range(len(cluster0)): - img0 = cluster0[len(cluster0) - 1 - i] - if True:#(img0, img1) in pairs or (img1, img0) in pairs: - print('inside pairs') - - - print(j, i, len(cluster1), len(cluster0)) + # if True: # bruteforce, global descriptors or other ways to reduce image pairs are not used + # if (img0, img1) not in processed_pairs and (img1, img0) not in processed_pairs: + if ( + (img0, img1) in pairs + or (img1, img0) in pairs + and (img0, img1) not in processed_pairs + and (img1, img0) not in processed_pairs + ): + processed_pairs.append((img0, img1)) + # print('inside pairs') + # print(j, i, len(cluster1), len(cluster0)) matchesXrotation = find_matches_per_rotation( str(path_to_upright_dir / img0), str(path_to_upright_dir / img1), @@ -126,6 +158,7 @@ def prova( cv2_rot_params, SPextractor, LGmatcher, + resize_size, ) index_of_max = max( range(len(matchesXrotation)), @@ -135,18 +168,24 @@ def prova( if index_of_max != 0 and n_matches > 100: print(f"ref {img0} rotated {img1} {rotations[index_of_max]}") rotated_images.append((img1, rotations[index_of_max])) - image1 = cv2.imread(str(path_to_upright_dir / img1)) - rotated_image1 = cv2.rotate(image1, cv2_rot_params[index_of_max]) - rotated_image1 = cv2.cvtColor(rotated_image1, cv2.COLOR_BGR2GRAY) - cv2.imwrite(str(path_to_upright_dir / img1), rotated_image1) + image1 = Image.open(str(path_to_upright_dir / img1)).convert("L") + # image1 = cv2.imread(str(path_to_upright_dir / img1)) + # rotated_image1 = cv2.rotate(image1, cv2_rot_params[index_of_max]) + # rotated_image1 = cv2.cvtColor(rotated_image1, cv2.COLOR_BGR2GRAY) + p = image1.rotate(cv2_rot_params[index_of_max], expand=False) + p.save(str(path_to_upright_dir / img1)) processed.append(img1) break if index_of_max == 0 and n_matches > 100: processed.append(img1) - image1 = cv2.imread(str(path_to_upright_dir / img1)) - image1 = cv2.cvtColor(image1, cv2.COLOR_BGR2GRAY) - cv2.imwrite(str(path_to_upright_dir / img1), image1) - print(f"ref {img0} NOT rotated {img1} {rotations[index_of_max]}") + image1 = Image.open(str(path_to_upright_dir / img1)).convert("L") + # image1 = cv2.imread(str(path_to_upright_dir / img1)) + # image1 = cv2.cvtColor(image1, cv2.COLOR_BGR2GRAY) + image1.save(str(path_to_upright_dir / img1)) + # cv2.imwrite(str(path_to_upright_dir / img1), image1) + print( + f"ref {img0} NOT rotated {img1} {rotations[index_of_max]}" + ) break return processed, rotated_images @@ -376,13 +415,22 @@ def generate_pairs(self, **kwargs) -> Path: return self.pair_file - def rotate_upright_images(self, resize_size=1000): + def rotate_upright_images( + self, resize_size=500, n_cores=1, multi_processing=False + ) -> None: + """ + Try to rotate upright images. Useful for not rotation invariant approaches. + Rotate images are saved in 'upright_images' dir in results folder + + Returns: + None + """ + gc.collect() logger.info("Rotating images upright...") pairs = [(item[0].name, item[1].name) for item in self.pairs] path_to_upright_dir = self.output_dir / "upright_images" os.makedirs(path_to_upright_dir, exist_ok=False) images = os.listdir(self.image_dir) - processed_images = [] logger.info(f"Copying images to {path_to_upright_dir}") for img in images: @@ -390,13 +438,21 @@ def rotate_upright_images(self, resize_size=1000): logger.info(f"{len(images)} images copied") rotations = [0, 90, 180, 270] + # cv2_rot_params = [ + # None, + # cv2.ROTATE_90_CLOCKWISE, + # cv2.ROTATE_180, + # cv2.ROTATE_90_COUNTERCLOCKWISE, + # ] cv2_rot_params = [ None, - cv2.ROTATE_90_CLOCKWISE, - cv2.ROTATE_180, - cv2.ROTATE_90_COUNTERCLOCKWISE, + -90, + 180, + 90, ] self.rotated_images = [] + + logger.info(f"Initializing Superpoint + LIghtGlue..") SPextractor = SuperPointExtractor( config={ "general": {}, @@ -416,69 +472,10 @@ def rotate_upright_images(self, resize_size=1000): }, }, ) - features = { - "feat0": None, - "feat1": None, - } - def resize(img, new_width): - H, W = img.shape[:2] - new_height = int(H * 500 / W) - img = cv2.resize(img, (new_width, new_height)) - img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - return img - - # @torch.no_grad() # probably not useful - def find_matches_per_rotation( - path_to_img0, path_to_img1, rotations, cv2_rot_params - ): - image0 = cv2.imread(path_to_img0) - image0 = resize(image0, resize_size) - image1 = cv2.imread(path_to_img1) - image1 = resize(image1, resize_size) - features["feat0"] = SPextractor._extract(image0) - matchesXrotation = [] - for rotation, cv2rotation in zip(rotations, cv2_rot_params): - _image1 = image1 - # rotation_matrix = cv2.getRotationMatrix2D((new_width / 2, new_height / 2), rotation, 1.0) - # rotated_image = cv2.warpAffine(image, rotation_matrix, (new_width, new_height)) - if rotation != 0: - _image1 = cv2.rotate(_image1, cv2rotation) - features["feat1"] = SPextractor._extract(_image1) - matches = LGmatcher._match_pairs(features["feat0"], features["feat1"]) - - matchesXrotation.append((rotation, matches.shape[0])) - return matchesXrotation - - # for pair in tqdm(self.pairs): - # # Reference image - # ref_image = pair[0].name - # image0 = cv2.imread(str(path_to_upright_dir / ref_image)) - # image0 = resize(image0, resize_size) - # features["feat0"] = SPextractor._extract(image0) - # - # # Target image - find the best rotation - # target_img = pair[1].name - # if target_img not in processed_images: - # #processed_images.append(target_img) - # image1 = cv2.imread(str(path_to_upright_dir / target_img)) - # matchesXrotation = find_matches_per_rotation(image1, rotations, cv2_rot_params) - # index_of_max = max( - # range(len(matchesXrotation)), key=lambda i: matchesXrotation[i][1] - # ) - # n_matches = matchesXrotation[index_of_max][1] - # if index_of_max != 0 and n_matches > 100: - # processed_images.append(target_img) - # self.rotated_images.append((pair[1].name, rotations[index_of_max])) - # rotated_image1 = cv2.rotate(image1, cv2_rot_params[index_of_max]) - # cv2.imwrite(str(path_to_upright_dir / target_img), rotated_image1) - - logger.info(f"Rotating images 'upright'") cluster0 = [] cluster1 = os.listdir(path_to_upright_dir) - import random - SPextractor = SuperPointExtractor( config={ "general": {}, @@ -499,82 +496,80 @@ def find_matches_per_rotation( }, ) + # Random init random_first_img = random.randint(0, len(cluster1)) - random_first_img = 1 #15 - print(cluster1[random_first_img]) + # Choose first image + # random_first_img = 1 cluster0.append(cluster1[random_first_img]) cluster1.pop(random_first_img) - iterations = 0 - max_iter = 130 - - while True: - iterations += 1 + # Main loop + processed_pairs = [] + max_iter = len(images) + logger.info(f"Max n iter: {max_iter}") + for iter in tqdm(range(max_iter)): rotated = [] - print("cluster0", cluster0) - print("cluster1", cluster1) - - rotated.sort - #cluster0 = [] - for r in reversed(rotated): - cluster0.append(cluster1[r]) - for r in reversed(rotated): - cluster1.pop(r) - - if iterations == max_iter or len(cluster1) == 0: - print("max_iter", max_iter) - print("len(cluster1)", len(cluster1)) - break + print(f"len(cluster0): {len(cluster0)}\t len(cluster1): {len(cluster1)}") + last_cluster1_len = len(cluster1) + + # rotated.sort + ##cluster0 = [] + # for r in reversed(rotated): + # cluster0.append(cluster1[r]) + # for r in reversed(rotated): + # cluster1.pop(r) + + if multi_processing: + partial_upright = partial( + upright, + cluster0, + path_to_upright_dir, + rotations, + cv2_rot_params, + SPextractor, + LGmatcher, + pairs, + resize_size, + ) + sublists = np.array_split(cluster1, n_cores) + with Pool(n_cores) as p: + results = p.map(partial_upright, sublists) - N_CORES = 4 - print(cluster0) - print(cluster1) - partial_prova = partial( - prova, - cluster0, - path_to_upright_dir, - rotations, - cv2_rot_params, - SPextractor, - LGmatcher, - pairs, - ) - sublists = np.array_split(cluster1, N_CORES) - with Pool(N_CORES) as p: - results = p.map(partial_prova, sublists) - print(len(sublists[0])) - print(len(sublists[1])) - print(len(sublists[2])) - print(len(sublists[3])) - - processed = [item[0] for item in results] - rotated = [item[1] for item in results] - - processed = [item for sublist in processed for item in sublist if item] - self.rotated_images = self.rotated_images + [item[0] for item in rotated if item != []] - - #print(processed) - #print(rotated) - #quit() - #self.rotated_images = [item for item in rotated_images] + processed = [item[0] for item in results] + rotated = [item[1] for item in results] + + processed = [item for sublist in processed for item in sublist if item] + self.rotated_images = self.rotated_images + [ + item[0] for item in rotated if item != [] + ] + + else: + processed, rotated = upright( + cluster0, + path_to_upright_dir, + rotations, + cv2_rot_params, + SPextractor, + LGmatcher, + pairs, + resize_size, + processed_pairs, + cluster1, + ) for r in processed: cluster0.append(r) cluster1 = [name for name in cluster1 if name not in cluster0] - if iterations == max_iter or len(cluster1) == 0: - print("max_iter", max_iter) - print("len(cluster1)", len(cluster1)) + if last_cluster1_len == len(cluster1) or len(cluster1) == 0: break out_file = self.pair_file.parent / f"{self.pair_file.stem}_rot.txt" - print('out_file', out_file) with open(out_file, "w") as txt_file: for element in self.rotated_images: print(element) txt_file.write(f"{element[0]} {element[1]}\n") - #quit() # Update image directory to the dir with upright images # Features will be rotate accordingly on exporting, if the images have been rotated @@ -584,6 +579,7 @@ def find_matches_per_rotation( torch.cuda.empty_cache() logger.info(f"Images rotated and saved in {path_to_upright_dir}") + gc.collect() def extract_features(self) -> Path: """