diff --git a/README.md b/README.md index 367f4be..559d937 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ You may use [YOLO](https://docs.ultralytics.com/) to automatically perform detec Detect objects with Ultralytics YOLO detections, apply SORT tracking and convert tracks to CVAT format. ``` -detector2cvat --video path_to_videos --save path_to_save +detector2cvat --video path_to_videos --save path_to_save [--imshow] ``` diff --git a/src/kabr_tools/cvat2slowfast.py b/src/kabr_tools/cvat2slowfast.py index 54cd4de..fbdcd92 100644 --- a/src/kabr_tools/cvat2slowfast.py +++ b/src/kabr_tools/cvat2slowfast.py @@ -1,5 +1,6 @@ import os import sys +from typing import Optional import argparse import json from lxml import etree @@ -9,9 +10,8 @@ import cv2 -def cvat2slowfast(path_to_mini_scenes, path_to_new_dataset, label2number, old2new): - number2label = {value: key for key, value in label2number.items()} - +def cvat2slowfast(path_to_mini_scenes: str, path_to_new_dataset: str, + label2number: dict, old2new: Optional[dict]) -> None: if not os.path.exists(path_to_new_dataset): os.makedirs(path_to_new_dataset) @@ -143,7 +143,7 @@ def cvat2slowfast(path_to_mini_scenes, path_to_new_dataset, label2number, old2ne f"{path_to_new_dataset}/annotation/data.csv", sep=" ", index=False) -def parse_args(): +def parse_args() -> argparse.Namespace: local_parser = argparse.ArgumentParser() local_parser.add_argument( '--miniscene', @@ -172,7 +172,7 @@ def parse_args(): return local_parser.parse_args() -def main(): +def main() -> None: args = parse_args() with open(args.classes, mode='r', encoding='utf-8') as file: diff --git a/src/kabr_tools/cvat2ultralytics.py b/src/kabr_tools/cvat2ultralytics.py index bc0dd4c..5b757c8 100644 --- a/src/kabr_tools/cvat2ultralytics.py +++ b/src/kabr_tools/cvat2ultralytics.py @@ -1,4 +1,5 @@ import os +from typing import Optional import argparse import json import cv2 @@ -10,7 +11,9 @@ from natsort import natsorted -def cvat2ultralytics(video_path, annotation_path, dataset, skip, label2index=None): +def cvat2ultralytics(video_path: str, annotation_path: str, + dataset: str, skip: int, + label2index: Optional[dict] = None) -> None: # Create a YOLO dataset structure. dataset_file = f""" path: {dataset} @@ -169,7 +172,7 @@ def cvat2ultralytics(video_path, annotation_path, dataset, skip, label2index=Non shutil.move(f"{dataset}/labels/train/{file}", f"{dataset}/labels/test/{file}") -def parse_args(): +def parse_args() -> argparse.Namespace: local_parser = argparse.ArgumentParser() local_parser.add_argument( '--video', @@ -204,7 +207,7 @@ def parse_args(): return local_parser.parse_args() -def main(): +def main() -> None: args = parse_args() if args.label2index: diff --git a/src/kabr_tools/detector2cvat.py b/src/kabr_tools/detector2cvat.py index 6a4c5b8..52df627 100644 --- a/src/kabr_tools/detector2cvat.py +++ b/src/kabr_tools/detector2cvat.py @@ -8,8 +8,7 @@ from kabr_tools.utils.draw import Draw - -def detector2cvat(path_to_videos, path_to_save): +def detector2cvat(path_to_videos: str, path_to_save: str, show: bool) -> None: videos = [] for root, dirs, files in os.walk(path_to_videos): @@ -77,7 +76,9 @@ def detector2cvat(path_to_videos, path_to_save): cv2.putText(visualization, f"Frame: {index}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 3, cv2.LINE_AA) - cv2.imshow("detector2cvat", cv2.resize(visualization, (int(width // 2.5), int(height // 2.5)))) + if show: + cv2.imshow("detector2cvat", cv2.resize( + visualization, (int(width // 2.5), int(height // 2.5)))) vw.write(visualization) key = cv2.waitKey(1) index += 1 @@ -97,26 +98,31 @@ def detector2cvat(path_to_videos, path_to_save): print("Something went wrong...") -def parse_args(): +def parse_args() -> argparse.Namespace: local_parser = argparse.ArgumentParser() local_parser.add_argument( - '--video', + "--video", type=str, - help='path to folder containing videos', + help="path to folder containing videos", required=True ) local_parser.add_argument( - '--save', + "--save", type=str, - help='path to save output xml & mp4 files', + help="path to save output xml & mp4 files", required=True ) + local_parser.add_argument( + "--imshow", + action="store_true", + help="flag to display detector's visualization" + ) return local_parser.parse_args() -def main(): +def main() -> None: args = parse_args() - detector2cvat(args.video, args.save) + detector2cvat(args.video, args.save, args.imshow) if __name__ == "__main__": diff --git a/src/kabr_tools/miniscene2behavior.py b/src/kabr_tools/miniscene2behavior.py index 022ffce..430ecdd 100644 --- a/src/kabr_tools/miniscene2behavior.py +++ b/src/kabr_tools/miniscene2behavior.py @@ -1,28 +1,34 @@ import sys +import argparse import torch from lxml import etree import pandas as pd import cv2 -import argparse from tqdm import tqdm import slowfast.utils.checkpoint as cu -import slowfast.models.build as build -import slowfast.utils.parser as parser +from slowfast.models import build +from slowfast.utils import parser from slowfast.datasets.utils import get_sequence from slowfast.visualization.utils import process_cv2_inputs from slowfast.datasets.cv2_transform import scale +from fvcore.common.config import CfgNode +from torch import Tensor -def get_input_clip(cap, cfg, keyframe_idx): +def get_input_clip(cap: cv2.VideoCapture, cfg: CfgNode, keyframe_idx: int) -> list[Tensor]: # https://github.com/facebookresearch/SlowFast/blob/bac7b672f40d44166a84e8c51d1a5ba367ace816/slowfast/visualization/ava_demo_precomputed_boxes.py seq_length = cfg.DATA.NUM_FRAMES * cfg.DATA.SAMPLING_RATE total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + assert keyframe_idx < total_frames, f"keyframe_idx: {keyframe_idx}" \ + f">= total_frames: {total_frames}" seq = get_sequence( keyframe_idx, seq_length // 2, cfg.DATA.SAMPLING_RATE, total_frames, ) + # TODO: remove after debugging + print(keyframe_idx, seq[0], seq[-1], total_frames) clip = [] for frame_idx in seq: cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) @@ -32,67 +38,67 @@ def get_input_clip(cap, cfg, keyframe_idx): frame = scale(cfg.DATA.TEST_CROP_SIZE, frame) clip.append(frame) else: - print('Unable to read frame. Duplicating previous frame.') + print("Unable to read frame. Duplicating previous frame.") clip.append(clip[-1]) clip = process_cv2_inputs(clip, cfg) return clip -def parse_args(): +def parse_args() -> argparse.Namespace: local_parser = argparse.ArgumentParser() local_parser.add_argument( - '--config', + "--config", type=str, - help='model config.yml filepath', - default='config.yml' + help="model config.yml filepath", + default="config.yml" ) local_parser.add_argument( - '--checkpoint', + "--checkpoint", type=str, - help='model checkpoint.pyth filepath', + help="model checkpoint.pyth filepath", required=True ) local_parser.add_argument( - '--gpu_num', + "--gpu_num", type=int, - help='number of gpus', + help="number of gpus", default=0 ) local_parser.add_argument( - '--miniscene', + "--miniscene", type=str, - help='miniscene folder containing miniscene\'s tracks.xml & *.mp4', + help="miniscene folder containing miniscene\'s tracks.xml & *.mp4", required=True ) local_parser.add_argument( - '--video', + "--video", type=str, - help='name of video (expect video_tracks.xml from tracks_extractor)', + help="name of video (expect video_tracks.xml from tracks_extractor)", required=True ) local_parser.add_argument( - '--output', + "--output", type=str, - help='filepath for output csv', - default='annotation_data.csv' + help="filepath for output csv", + default="annotation_data.csv" ) return local_parser.parse_args() -def create_model(config_path, checkpoint_path, gpu_num): +def create_model(config_path: str, checkpoint_path: str, gpu_num: int) -> tuple[CfgNode, torch.nn.Module]: # load model config try: cfg = parser.load_config(parser.parse_args(), config_path) except FileNotFoundError: checkpoint = torch.load( - checkpoint_path, map_location=torch.device('cpu')) - with open(config_path, 'w') as file: - file.write(checkpoint['cfg']) + checkpoint_path, map_location=torch.device("cpu")) + with open(config_path, "w") as file: + file.write(checkpoint["cfg"]) cfg = parser.load_config(parser.parse_args(), config_path) cfg.NUM_GPUS = gpu_num - cfg.OUTPUT_DIR = '' + cfg.OUTPUT_DIR = "" model = build.build_model(cfg) # load model checkpoint @@ -103,34 +109,42 @@ def create_model(config_path, checkpoint_path, gpu_num): return cfg, model -def annotate_miniscene(cfg, model, miniscene_path, video, output_path): +def annotate_miniscene(cfg: CfgNode, model: torch.nn.Module, + miniscene_path: str, video: str, + output_path: str) -> None: label_data = [] - track_file = f'{miniscene_path}/metadata/{video}_tracks.xml' + track_file = f"{miniscene_path}/metadata/{video}_tracks.xml" root = etree.parse(track_file).getroot() # find all tracks tracks = [] + frames = {} for track in root.iterfind("track"): track_id = track.attrib["id"] tracks.append(track_id) + frames[track_id] = [] - # find all frames - frames = [] - for box in track.iterfind("box"): - frames.append(int(box.attrib['frame'])) + # find all frames + for box in track.iterfind("box"): + frames[track_id].append(int(box.attrib["frame"])) # run model on miniscene for track in tracks: video_file = f"{miniscene_path}/{track}.mp4" cap = cv2.VideoCapture(video_file) - for frame in tqdm(frames, desc=f'{track} frames'): - inputs = get_input_clip(cap, cfg, frame) + print(f'{track=}') + for index, frame in tqdm(enumerate(frames[track]), desc=f'{track} frames'): + try: + inputs = get_input_clip(cap, cfg, index) + except AssertionError as e: + print(e) + break if cfg.NUM_GPUS: # transfer the data to the current GPU device. if isinstance(inputs, (list,)): - for i in range(len(inputs)): - inputs[i] = inputs[i].cuda(non_blocking=True) + for i, input_clip in enumerate(inputs): + inputs[i] = input_clip.cuda(non_blocking=True) else: inputs = inputs.cuda(non_blocking=True) @@ -140,17 +154,18 @@ def annotate_miniscene(cfg, model, miniscene_path, video, output_path): if cfg.NUM_GPUS: preds = preds.cpu() - label_data.append({'video': video, - 'track': track, - 'frame': frame, - 'label': torch.argmax(preds).item()}) + label_data.append({"video": video, + "track": track, + "frame": frame, + "label": torch.argmax(preds).item()}) if frame % 20 == 0: pd.DataFrame(label_data).to_csv( - output_path, sep=' ', index=False) - pd.DataFrame(label_data).to_csv(output_path, sep=' ', index=False) + output_path, sep=" ", index=False) + cap.release() + pd.DataFrame(label_data).to_csv(output_path, sep=" ", index=False) -def main(): +def main() -> None: # clear arguments to avoid slowfast parsing issues args = parse_args() sys.argv = [sys.argv[0]] @@ -159,5 +174,5 @@ def main(): args.video, args.output) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/kabr_tools/player.py b/src/kabr_tools/player.py index d440085..6c4a83f 100644 --- a/src/kabr_tools/player.py +++ b/src/kabr_tools/player.py @@ -1,12 +1,13 @@ import os import argparse import json -from lxml import etree from collections import OrderedDict +from lxml import etree import cv2 +from cv2.typing import MatLike -def on_slider_change(value): +def on_slider_change(value: int) -> None: global index, vcs, current, trackbar_position, paused, updated index = value @@ -17,7 +18,7 @@ def on_slider_change(value): updated = True -def pad(image, width, height): +def pad(image: MatLike, width: int, height: int) -> MatLike: shape_0, shape_1 = image.shape[0], image.shape[1] if shape_0 < shape_1: @@ -34,7 +35,7 @@ def pad(image, width, height): return padded -def draw_aim(current, image): +def draw_aim(current: str, image: MatLike) -> MatLike: if current == "main": return image @@ -47,7 +48,8 @@ def draw_aim(current, image): return cv2.addWeighted(image, 0.4, copied, 0.6, 0.0) -def draw_id(current, image, metadata, width): +def draw_id(current: str, image: MatLike, + metadata: dict, width: int) -> MatLike: if current == "main": label = f"Drone View" color = (127, 127, 127) @@ -68,7 +70,9 @@ def draw_id(current, image, metadata, width): return cv2.addWeighted(image, 0.4, copied, 0.6, 0.0) -def draw_actions(current, index, image, actions, metadata, width, height): +def draw_actions(current: str, index: int, + image: MatLike, actions: OrderedDict, + metadata: dict, width: int, height: int) -> MatLike: if current == "main": return image @@ -92,7 +96,7 @@ def draw_actions(current, index, image, actions, metadata, width, height): return cv2.addWeighted(image, 0.4, copied, 0.6, 0.0) -def draw_info(image, width): +def draw_info(image: MatLike, width: int) -> MatLike: copied = image.copy() cv2.rectangle(image, (width - 600, 100), (width - 100, 340), (0, 0, 0), -1) cv2.putText(image, "[0-9]: Show Track #[0-9]", (width - 565, 150), @@ -107,7 +111,7 @@ def draw_info(image, width): return cv2.addWeighted(image, 0.4, copied, 0.6, 0.0) -def hotkey(key): +def hotkey(key: int) -> None: global current, metadata, vc, letter2hotkey mapped = letter2hotkey[key] @@ -130,7 +134,7 @@ def hotkey(key): vc.set(cv2.CAP_PROP_POS_FRAMES, metadata["tracks"][current][index]) -def player(folder, save): +def player(folder: str, save: bool, show: bool) -> None: name = folder.split("/")[-1].split('|')[-1] metadata_path = f"{folder}/metadata/{name}_metadata.json" @@ -212,9 +216,11 @@ def player(folder, save): cv2.setTrackbarPos(name, "TrackPlayer", index) cv2.putText(visualization, f"Frame: {index}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 3, cv2.LINE_AA) - - cv2.imshow("TrackPlayer", cv2.resize(visualization, (int(target_width // 2.5), int(target_height // 2.5)), - interpolation=cv2.INTER_AREA)) + if show: + cv2.imshow("TrackPlayer", + cv2.resize(visualization, + (int(target_width // 2.5), int(target_height // 2.5)), + interpolation=cv2.INTER_AREA)) if save: vw.write(visualization) @@ -269,25 +275,30 @@ def player(folder, save): cv2.destroyAllWindows() -def parse_args(): +def parse_args() -> argparse.Namespace: local_parser = argparse.ArgumentParser() local_parser.add_argument( - '--folder', + "--folder", type=str, - help='path to folder with metadata and actions', + help="path to folder with metadata and actions", required=True ) local_parser.add_argument( - '--save', - action='store_true', - help='Flag to save video' + "--save", + action="store_true", + help="flag to save video" + ) + local_parser.add_argument( + "--imshow", + action="store_true", + help="flag to display detector's visualization" ) return local_parser.parse_args() -def main(): +def main() -> None: args = parse_args() - player(args.folder, args.save) + player(args.folder, args.save, args.imshow) if __name__ == "__main__": diff --git a/src/kabr_tools/tracks_extractor.py b/src/kabr_tools/tracks_extractor.py index 4beff99..112ac8e 100644 --- a/src/kabr_tools/tracks_extractor.py +++ b/src/kabr_tools/tracks_extractor.py @@ -15,7 +15,7 @@ from tqdm import tqdm -def generate_timeline_image(name, folder, timeline, annotated_size): +def generate_timeline_image(name: str, folder: str, timeline: OrderedDict, annotated_size: int) -> None: timeline_image = np.zeros(shape=(len(timeline["tracks"].keys()) * 100, annotated_size, 3), dtype=np.uint8) for i, (key, value) in enumerate(timeline["tracks"].items()): @@ -47,7 +47,7 @@ def generate_timeline_image(name, folder, timeline, annotated_size): cv2.imwrite(f"mini-scenes/{folder}/metadata/{name}.jpg", timeline_resized) -def extract(video_path, annotation_path, tracking, show): +def extract(video_path: str, annotation_path: str, tracking: bool, show: bool) -> None: # Parse CVAT for video 1.1 annotation file. root = etree.parse(annotation_path).getroot() annotated = dict() @@ -180,7 +180,7 @@ def extract(video_path, annotation_path, tracking, show): vw.release() cv2.destroyAllWindows() -def tracks_extractor(video, annotation, tracking, show): +def tracks_extractor(video: str, annotation: str, tracking: bool, show: bool) -> None: if os.path.isdir(annotation): videos = [] annotations = [] @@ -208,34 +208,34 @@ def tracks_extractor(video, annotation, tracking, show): extract(video, annotation, tracking, show) -def parse_args(): +def parse_args() -> argparse.Namespace: local_parser = argparse.ArgumentParser() local_parser.add_argument( - '--video', + "--video", type=str, - help='path to folder containing videos', + help="path to folder containing videos", required=True ) local_parser.add_argument( - '--annotation', + "--annotation", type=str, - help='path to folder containing annotations', + help="path to folder containing annotations", required=True ) local_parser.add_argument( - '--tracking', - action='store_true', - help='Flag to use external tracker instead of CVAT tracks' + "--tracking", + action="store_true", + help="flag to use external tracker instead of CVAT tracks" ) local_parser.add_argument( - '--imshow', - action='store_true', - help='Flag to display tracks\' visualization' + "--imshow", + action="store_true", + help="flag to display tracks\' visualization" ) return local_parser.parse_args() -def main(): +def main() -> None: args = parse_args() tracks_extractor(args.video, args.annotation, args.tracking, args.imshow)