diff --git a/src/hope_dedup_engine/apps/faces/celery/pipeline.py b/src/hope_dedup_engine/apps/faces/celery/pipeline.py index bf65055..4a937d5 100644 --- a/src/hope_dedup_engine/apps/faces/celery/pipeline.py +++ b/src/hope_dedup_engine/apps/faces/celery/pipeline.py @@ -5,13 +5,15 @@ from hope_dedup_engine.apps.api.models import DeduplicationSet from hope_dedup_engine.apps.faces.celery.tasks.deduplication import ( encode_images, - filter_ignored_pairs, find_duplicates, get_deduplication_set_embedding_pairs, get_deduplication_set_image_files, save_encoding_errors_in_findings, ) -from hope_dedup_engine.utils.celery.utility_tasks import parallelize +from hope_dedup_engine.utils.celery.utility_tasks import ( + batched_compact_pairs, + parallelize, +) IMAGE_ENCODING_BATCH_SIZE = 50 DUPLICATE_FINDING_BATCH_SIZE = 200 @@ -27,9 +29,9 @@ def image_pipeline( ) find_duplicates_pipeline = parallelize.si( get_deduplication_set_embedding_pairs.s(deduplication_set.id), - filter_ignored_pairs.s(deduplication_set.id) - | find_duplicates.s(deduplication_set.id, config.get("deduplicate", {})), + find_duplicates.s(deduplication_set.id, config.get("deduplicate", {})), DUPLICATE_FINDING_BATCH_SIZE, + splitter=batched_compact_pairs.s(), ) return ( diff --git a/src/hope_dedup_engine/apps/faces/celery/tasks/deduplication.py b/src/hope_dedup_engine/apps/faces/celery/tasks/deduplication.py index 2e6d435..51c7335 100644 --- a/src/hope_dedup_engine/apps/faces/celery/tasks/deduplication.py +++ b/src/hope_dedup_engine/apps/faces/celery/tasks/deduplication.py @@ -1,5 +1,4 @@ -from collections.abc import Iterator -from itertools import combinations +from collections.abc import Generator, Iterable from typing import Any from hope_dedup_engine.apps.api.models import DeduplicationSet @@ -10,6 +9,7 @@ from hope_dedup_engine.config.celery import app from hope_dedup_engine.constants import FacialError from hope_dedup_engine.types import EntityEmbedding, Filename, SortedTuple +from hope_dedup_engine.utils import compact_pairs from hope_dedup_engine.utils.celery.task_result import wrapped @@ -41,11 +41,24 @@ def encode_images( deduplication_set.update_encoding_errors(errors) +def filter_ignored_pairs( + embedding_pairs: Iterable[tuple[EntityEmbedding, EntityEmbedding]], + deduplication_set: DeduplicationSet, +) -> Generator[tuple[EntityEmbedding, EntityEmbedding], None, None]: + ignored_pairs = deduplication_set.get_ignored_pairs() + for embedding_pair in embedding_pairs: + first, second = embedding_pair + first_reference_pk, _ = first + second_reference_pk, _ = second + if SortedTuple((first_reference_pk, second_reference_pk)) not in ignored_pairs: + yield embedding_pair + + @app.task @wrapped def get_deduplication_set_embedding_pairs( deduplication_set_id: str, -) -> Iterator[tuple[EntityEmbedding, EntityEmbedding]]: +) -> compact_pairs.CompactPairs[EntityEmbedding]: deduplication_set: DeduplicationSet = DeduplicationSet.objects.get( pk=deduplication_set_id ) @@ -58,41 +71,20 @@ def get_deduplication_set_embedding_pairs( if filename in deduplication_set.encodings ) - return combinations(entity_embeddings, 2) - - -@app.task -@wrapped -def filter_ignored_pairs( - embedding_pairs: list[tuple[EntityEmbedding, EntityEmbedding]], - deduplication_set_id: str, -) -> list[tuple[EntityEmbedding, EntityEmbedding]]: - deduplication_set: DeduplicationSet = DeduplicationSet.objects.get( - pk=deduplication_set_id - ) - ignored_pairs = deduplication_set.get_ignored_pairs() - filtered = [] - for embedding_pair in embedding_pairs: - first, second = embedding_pair - first_reference_pk, _ = first - second_reference_pk, _ = second - if SortedTuple((first_reference_pk, second_reference_pk)) not in ignored_pairs: - filtered.append(embedding_pair) - - return filtered + return compact_pairs.from_collection(entity_embeddings) @app.task @wrapped def find_duplicates( - embedding_pairs: list[tuple[EntityEmbedding, EntityEmbedding]], + embedding_pairs: compact_pairs.CompactPairs[EntityEmbedding], deduplication_set_id: str, deduplicate_config: dict[str, Any], ) -> None: """Deduplicate faces in a chunk of files.""" deduplication_set = DeduplicationSet.objects.get(pk=deduplication_set_id) findings = find_similar_faces( - embedding_pairs, + filter_ignored_pairs(compact_pairs.unwrap(embedding_pairs), deduplication_set), dedupe_threshold=deduplicate_config.get("threshold"), options=deduplicate_config, ) diff --git a/src/hope_dedup_engine/apps/faces/services/facial.py b/src/hope_dedup_engine/apps/faces/services/facial.py index 0a36718..65b1c7c 100644 --- a/src/hope_dedup_engine/apps/faces/services/facial.py +++ b/src/hope_dedup_engine/apps/faces/services/facial.py @@ -1,5 +1,5 @@ import logging -from collections.abc import Generator +from collections.abc import Generator, Iterable from typing import Any, cast from deepface import DeepFace @@ -56,7 +56,7 @@ def face_similarity(first: Embedding, second: Embedding, **options: Any) -> floa def find_similar_faces( - encoded_pairs: list[tuple[EntityEmbedding, EntityEmbedding]], + encoded_pairs: Iterable[tuple[EntityEmbedding, EntityEmbedding]], dedupe_threshold: float, options: dict[str, Any], ) -> Generator[tuple[EncodedFace, EncodedFace, float]]: diff --git a/src/hope_dedup_engine/types.py b/src/hope_dedup_engine/types.py index 5cdf2b5..d854919 100644 --- a/src/hope_dedup_engine/types.py +++ b/src/hope_dedup_engine/types.py @@ -3,17 +3,17 @@ from hope_dedup_engine.constants import FacialError -ReferencePK = str -Filename = str -Embedding = list[float] -Score = float +type ReferencePK = str +type Filename = str +type Embedding = list[float] +type Score = float -EntityImage = tuple[ReferencePK, Filename] -EntityEmbedding = tuple[ReferencePK, Embedding] -EntityEmbeddingError = tuple[ReferencePK, FacialError] -ImageEmbedding = tuple[Filename, Embedding] -ImageEmbeddingError = tuple[Filename, FacialError] -Finding = tuple[ReferencePK, ReferencePK, Score] +type EntityImage = tuple[ReferencePK, Filename] +type EntityEmbedding = tuple[ReferencePK, Embedding] +type EntityEmbeddingError = tuple[ReferencePK, FacialError] +type ImageEmbedding = tuple[Filename, Embedding] +type ImageEmbeddingError = tuple[Filename, FacialError] +type Finding = tuple[ReferencePK, ReferencePK, Score] class SortedTuple(tuple): diff --git a/src/hope_dedup_engine/utils/celery/utility_tasks.py b/src/hope_dedup_engine/utils/celery/utility_tasks.py index 12c3368..47bc699 100644 --- a/src/hope_dedup_engine/utils/celery/utility_tasks.py +++ b/src/hope_dedup_engine/utils/celery/utility_tasks.py @@ -1,15 +1,32 @@ -from itertools import batched -from typing import Any, NoReturn +import itertools +from collections.abc import Iterable +from typing import Any, Generator, NoReturn import celery from celery import canvas from hope_dedup_engine.config.celery import app from hope_dedup_engine.utils.celery.task_result import unwrap_result, wrapped +from hope_dedup_engine.utils.compact_pairs import CompactPairs, split SerializedTask = dict[str, Any] +@app.task +def batched[T](iterable: Iterable[T], size: int) -> Iterable[Iterable[T]]: + return itertools.batched(iterable, size) + + +@app.task +def batched_compact_pairs[ + T +](pairs: CompactPairs[T], size: int) -> Generator[CompactPairs[T], None, None]: + return split(pairs, size) + + +DEFAULT_SPLITTER = batched.s() + + @app.task(bind=True) @wrapped def parallelize( @@ -18,6 +35,7 @@ def parallelize( task: SerializedTask, batch_size: int, end_task: SerializedTask | None = None, + splitter: SerializedTask = DEFAULT_SPLITTER, ) -> NoReturn: producer_signature = self.app.signature(producer) data = unwrap_result(producer_signature()) @@ -25,7 +43,8 @@ def parallelize( signature: canvas.Signature = self.app.signature(task) signatures = [] - for batch in batched(data, batch_size): + splitter_signature = self.app.signature(splitter) + for batch in splitter_signature(data, batch_size): args = (batch,) if isinstance(signature, canvas._chain): clone = signature.clone() diff --git a/src/hope_dedup_engine/utils/compact_pairs.py b/src/hope_dedup_engine/utils/compact_pairs.py new file mode 100644 index 0000000..d383cf7 --- /dev/null +++ b/src/hope_dedup_engine/utils/compact_pairs.py @@ -0,0 +1,55 @@ +from collections.abc import Collection, Generator +from functools import partial +from itertools import accumulate, islice, takewhile +from operator import ge + +type CompactPairs[T] = tuple[Collection[T], int, int] + + +def total_length(collection: Collection) -> int: + return (collection_length := len(collection)) * (collection_length - 1) // 2 + + +def length(compact_pairs: CompactPairs) -> int: + sequence, start, stop = compact_pairs + return stop - start if stop else total_length(sequence) - start + + +def from_collection[T](collection: Collection[T]) -> CompactPairs[T]: + return collection, 0, total_length(collection) + + +def optimize[T](compact_pairs: CompactPairs[T]) -> CompactPairs[T]: + sequence, start, end = compact_pairs + deltas = tuple( + takewhile(partial(ge, start), accumulate(range(len(sequence) - 1, 0, -1))) + ) + skip = len(deltas) + sequence = sequence[skip:] + if deltas: + start -= deltas[-1] + if end: + end -= deltas[-1] + + return sequence, start, end + + +def unwrap[T](pairs: CompactPairs[T]) -> Generator[tuple[T, T], None, None]: + sequence, start, end = pairs + all_pairs = ( + (e1, e2) for i, e1 in enumerate(sequence) for e2 in sequence[i + 1:] # fmt: skip + ) + yield from islice(all_pairs, start, end) + + +def split[ + T +](pairs: CompactPairs[T], size: int) -> Generator[CompactPairs[T], None, None]: + sequence, start, stop = pairs + stop = stop or length(pairs) + bounds = list(range(start, stop + 1, size)) + if bounds[-1] < stop: + bounds.append(stop) + + for start, stop in zip(bounds, bounds[1:]): + yield optimize((sequence, start, stop))