Skip to content

Commit

Permalink
new: adjust max internal batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
joein committed Jan 23, 2025
1 parent 42a5172 commit 7976b88
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
3 changes: 3 additions & 0 deletions qdrant_client/embed/model_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def process(self, items: Iterable[tuple[int, Any]]) -> Iterable[tuple[int, Any]]


class ModelEmbedder:
MAX_INTERNAL_BATCH_SIZE = 4

def __init__(self, parser: Optional[ModelSchemaParser] = None, **kwargs):
self._batch_accumulator: dict[str, list[INFERENCE_OBJECT_TYPES]] = {}
self._embed_storage: dict[str, list[NumericVector]] = {}
Expand Down Expand Up @@ -105,6 +107,7 @@ def embed_models_strict(
num_workers=parallel,
worker=self._get_worker_class(),
start_method=start_method,
max_internal_batch_size=self.MAX_INTERNAL_BATCH_SIZE,
)

for batch in pool.ordered_map(raw_models_batches):
Expand Down
10 changes: 8 additions & 2 deletions qdrant_client/parallel_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# Single item should be processed in less than:
processing_timeout = 10 * 60 # seconds

max_internal_batch_size = 200
MAX_INTERNAL_BATCH_SIZE = 200


class QueueSignals(str, Enum):
Expand Down Expand Up @@ -85,7 +85,13 @@ def input_queue_iterable() -> Iterable[Any]:


class ParallelWorkerPool:
def __init__(self, num_workers: int, worker: Type[Worker], start_method: Optional[str] = None):
def __init__(
self,
num_workers: int,
worker: Type[Worker],
start_method: Optional[str] = None,
max_internal_batch_size: int = MAX_INTERNAL_BATCH_SIZE,
):
self.worker_class = worker
self.num_workers = num_workers
self.input_queue: Optional[Queue] = None
Expand Down

0 comments on commit 7976b88

Please sign in to comment.