Skip to content

Commit

Permalink
Optimize memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
sergey-misuk-valor committed Jan 26, 2025
1 parent 4940804 commit 98d75a0
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 46 deletions.
10 changes: 6 additions & 4 deletions src/hope_dedup_engine/apps/faces/celery/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from hope_dedup_engine.apps.api.models import DeduplicationSet
from hope_dedup_engine.apps.faces.celery.tasks.deduplication import (
encode_images,
filter_ignored_pairs,
find_duplicates,
get_deduplication_set_embedding_pairs,
get_deduplication_set_image_files,
save_encoding_errors_in_findings,
)
from hope_dedup_engine.utils.celery.utility_tasks import parallelize
from hope_dedup_engine.utils.celery.utility_tasks import (
batched_compact_pairs,
parallelize,
)

IMAGE_ENCODING_BATCH_SIZE = 50
DUPLICATE_FINDING_BATCH_SIZE = 200
Expand All @@ -27,9 +29,9 @@ def image_pipeline(
)
find_duplicates_pipeline = parallelize.si(
get_deduplication_set_embedding_pairs.s(deduplication_set.id),
filter_ignored_pairs.s(deduplication_set.id)
| find_duplicates.s(deduplication_set.id, config.get("deduplicate", {})),
find_duplicates.s(deduplication_set.id, config.get("deduplicate", {})),
DUPLICATE_FINDING_BATCH_SIZE,
splitter=batched_compact_pairs.s(),
)

return (
Expand Down
46 changes: 19 additions & 27 deletions src/hope_dedup_engine/apps/faces/celery/tasks/deduplication.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from collections.abc import Iterator
from itertools import combinations
from collections.abc import Generator, Iterable
from typing import Any

from hope_dedup_engine.apps.api.models import DeduplicationSet
Expand All @@ -10,6 +9,7 @@
from hope_dedup_engine.config.celery import app
from hope_dedup_engine.constants import FacialError
from hope_dedup_engine.types import EntityEmbedding, Filename, SortedTuple
from hope_dedup_engine.utils import compact_pairs
from hope_dedup_engine.utils.celery.task_result import wrapped


Expand Down Expand Up @@ -41,11 +41,24 @@ def encode_images(
deduplication_set.update_encoding_errors(errors)


def filter_ignored_pairs(
embedding_pairs: Iterable[tuple[EntityEmbedding, EntityEmbedding]],
deduplication_set: DeduplicationSet,
) -> Generator[tuple[EntityEmbedding, EntityEmbedding], None, None]:
ignored_pairs = deduplication_set.get_ignored_pairs()
for embedding_pair in embedding_pairs:
first, second = embedding_pair
first_reference_pk, _ = first
second_reference_pk, _ = second
if SortedTuple((first_reference_pk, second_reference_pk)) not in ignored_pairs:
yield embedding_pair


@app.task
@wrapped
def get_deduplication_set_embedding_pairs(
deduplication_set_id: str,
) -> Iterator[tuple[EntityEmbedding, EntityEmbedding]]:
) -> compact_pairs.CompactPairs[EntityEmbedding]:
deduplication_set: DeduplicationSet = DeduplicationSet.objects.get(
pk=deduplication_set_id
)
Expand All @@ -58,41 +71,20 @@ def get_deduplication_set_embedding_pairs(
if filename in deduplication_set.encodings
)

return combinations(entity_embeddings, 2)


@app.task
@wrapped
def filter_ignored_pairs(
embedding_pairs: list[tuple[EntityEmbedding, EntityEmbedding]],
deduplication_set_id: str,
) -> list[tuple[EntityEmbedding, EntityEmbedding]]:
deduplication_set: DeduplicationSet = DeduplicationSet.objects.get(
pk=deduplication_set_id
)
ignored_pairs = deduplication_set.get_ignored_pairs()
filtered = []
for embedding_pair in embedding_pairs:
first, second = embedding_pair
first_reference_pk, _ = first
second_reference_pk, _ = second
if SortedTuple((first_reference_pk, second_reference_pk)) not in ignored_pairs:
filtered.append(embedding_pair)

return filtered
return compact_pairs.from_collection(entity_embeddings)


@app.task
@wrapped
def find_duplicates(
embedding_pairs: list[tuple[EntityEmbedding, EntityEmbedding]],
embedding_pairs: compact_pairs.CompactPairs[EntityEmbedding],
deduplication_set_id: str,
deduplicate_config: dict[str, Any],
) -> None:
"""Deduplicate faces in a chunk of files."""
deduplication_set = DeduplicationSet.objects.get(pk=deduplication_set_id)
findings = find_similar_faces(
embedding_pairs,
filter_ignored_pairs(compact_pairs.unwrap(embedding_pairs), deduplication_set),
dedupe_threshold=deduplicate_config.get("threshold"),
options=deduplicate_config,
)
Expand Down
4 changes: 2 additions & 2 deletions src/hope_dedup_engine/apps/faces/services/facial.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from collections.abc import Generator
from collections.abc import Generator, Iterable
from typing import Any, cast

from deepface import DeepFace
Expand Down Expand Up @@ -56,7 +56,7 @@ def face_similarity(first: Embedding, second: Embedding, **options: Any) -> floa


def find_similar_faces(
encoded_pairs: list[tuple[EntityEmbedding, EntityEmbedding]],
encoded_pairs: Iterable[tuple[EntityEmbedding, EntityEmbedding]],
dedupe_threshold: float,
options: dict[str, Any],
) -> Generator[tuple[EncodedFace, EncodedFace, float]]:
Expand Down
20 changes: 10 additions & 10 deletions src/hope_dedup_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@

from hope_dedup_engine.constants import FacialError

ReferencePK = str
Filename = str
Embedding = list[float]
Score = float
type ReferencePK = str
type Filename = str
type Embedding = list[float]
type Score = float

EntityImage = tuple[ReferencePK, Filename]
EntityEmbedding = tuple[ReferencePK, Embedding]
EntityEmbeddingError = tuple[ReferencePK, FacialError]
ImageEmbedding = tuple[Filename, Embedding]
ImageEmbeddingError = tuple[Filename, FacialError]
Finding = tuple[ReferencePK, ReferencePK, Score]
type EntityImage = tuple[ReferencePK, Filename]
type EntityEmbedding = tuple[ReferencePK, Embedding]
type EntityEmbeddingError = tuple[ReferencePK, FacialError]
type ImageEmbedding = tuple[Filename, Embedding]
type ImageEmbeddingError = tuple[Filename, FacialError]
type Finding = tuple[ReferencePK, ReferencePK, Score]


class SortedTuple(tuple):
Expand Down
25 changes: 22 additions & 3 deletions src/hope_dedup_engine/utils/celery/utility_tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
from itertools import batched
from typing import Any, NoReturn
import itertools
from collections.abc import Iterable
from typing import Any, Generator, NoReturn

import celery
from celery import canvas

from hope_dedup_engine.config.celery import app
from hope_dedup_engine.utils.celery.task_result import unwrap_result, wrapped
from hope_dedup_engine.utils.compact_pairs import CompactPairs, split

SerializedTask = dict[str, Any]


@app.task
def batched[T](iterable: Iterable[T], size: int) -> Iterable[Iterable[T]]:
return itertools.batched(iterable, size)


@app.task
def batched_compact_pairs[
T
](pairs: CompactPairs[T], size: int) -> Generator[CompactPairs[T], None, None]:
return split(pairs, size)


DEFAULT_SPLITTER = batched.s()


@app.task(bind=True)
@wrapped
def parallelize(
Expand All @@ -18,14 +35,16 @@ def parallelize(
task: SerializedTask,
batch_size: int,
end_task: SerializedTask | None = None,
splitter: SerializedTask = DEFAULT_SPLITTER,
) -> NoReturn:
producer_signature = self.app.signature(producer)
data = unwrap_result(producer_signature())

signature: canvas.Signature = self.app.signature(task)

signatures = []
for batch in batched(data, batch_size):
splitter_signature = self.app.signature(splitter)
for batch in splitter_signature(data, batch_size):
args = (batch,)
if isinstance(signature, canvas._chain):
clone = signature.clone()
Expand Down
55 changes: 55 additions & 0 deletions src/hope_dedup_engine/utils/compact_pairs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from collections.abc import Collection, Generator
from functools import partial
from itertools import accumulate, islice, takewhile
from operator import ge

type CompactPairs[T] = tuple[Collection[T], int, int]


def total_length(collection: Collection) -> int:
return (collection_length := len(collection)) * (collection_length - 1) // 2


def length(compact_pairs: CompactPairs) -> int:
sequence, start, stop = compact_pairs
return stop - start if stop else total_length(sequence) - start


def from_collection[T](collection: Collection[T]) -> CompactPairs[T]:
return collection, 0, total_length(collection)


def optimize[T](compact_pairs: CompactPairs[T]) -> CompactPairs[T]:
sequence, start, end = compact_pairs
deltas = tuple(
takewhile(partial(ge, start), accumulate(range(len(sequence) - 1, 0, -1)))
)
skip = len(deltas)
sequence = sequence[skip:]
if deltas:
start -= deltas[-1]
if end:
end -= deltas[-1]

return sequence, start, end


def unwrap[T](pairs: CompactPairs[T]) -> Generator[tuple[T, T], None, None]:
sequence, start, end = pairs
all_pairs = (
(e1, e2) for i, e1 in enumerate(sequence) for e2 in sequence[i + 1:] # fmt: skip
)
yield from islice(all_pairs, start, end)


def split[
T
](pairs: CompactPairs[T], size: int) -> Generator[CompactPairs[T], None, None]:
sequence, start, stop = pairs
stop = stop or length(pairs)
bounds = list(range(start, stop + 1, size))
if bounds[-1] < stop:
bounds.append(stop)

for start, stop in zip(bounds, bounds[1:]):
yield optimize((sequence, start, stop))

0 comments on commit 98d75a0

Please sign in to comment.