Skip to content

Commit

Permalink
test single doc
Browse files Browse the repository at this point in the history
  • Loading branch information
mrzaizai2k committed Nov 12, 2024
1 parent 672ae36 commit 7755a21
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 31 deletions.
22 changes: 22 additions & 0 deletions src/Utils/process_documents_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,28 @@ def _process_single_document(self, document: Dict):
with self.lock:
self.currently_processing.remove(document_id)
self.queued_documents.remove(document_id)

def process_single_document(self, document: Dict):
try:
document_id = document['_id']
base64_img = document['invoice_image_base64']
file_name = document['file_name']

new_data = extract_invoice_info(
base64_img=base64_img,
ocr_reader=self.ocr_reader,
invoice_extractor=self.invoice_extractor,
config=self.config,
logger=self.logger,
file_name=file_name
)

self.mongo_db.update_document_by_id(str(document_id), new_data)

except Exception as e:
if self.logger:
self.logger.error(f"Error processing document {document_id}: {str(e)}")


def get_total_docs(self) -> int:
"""Returns the total number of documents in the queue plus those currently processing."""
Expand Down
62 changes: 31 additions & 31 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,33 @@
logger=logger,
)

def process_in_batches(batch_size=2):
while True:
# Get the next batch of "not extracted" documents, up to the batch size
documents, _ = mongo_db.get_documents(filters={"status": "not extracted"}, limit=batch_size)

# If no documents are left, exit the loop
if not documents:
break

# Process each document one at a time
for document in documents:
batch_processor.process_single_document(document)


def process_change_stream(config):
global change_stream
batch_processor.start()

for change in change_stream:
if change['operationType'] == 'insert':
# Only process the newly inserted document
document_id = change['documentKey']['_id']
document = mongo_db.get_document_by_id(document_id)
# Get the current count of "not extracted" documents
_, total_matching_docs = mongo_db.get_documents(filters={"status": "not extracted"})

if document and document.get('status') == 'not extracted':
# Try to add to queue, log if queue is full
if not batch_processor.add_to_queue(document):
logger.warning(f"Could not add document {document_id} to queue - queue full or document already queued")

# Optionally, check for any missed documents periodically
if random.random() < 0.1: # 10% chance to check for missed documents
unprocessed_docs, _ = mongo_db.get_documents(
filters={
"status": "not extracted",
"_id": {"$nin": list(batch_processor.queued_documents.union(batch_processor.currently_processing))}
},
limit=5
)

for doc in unprocessed_docs:
batch_processor.add_to_queue(doc)
# Only proceed if there are fewer than 3 "not extracted" documents
if total_matching_docs < 3:
# Start processing in batches of 3
process_in_batches(batch_size=2)


elif change['operationType'] == 'update':
Expand Down Expand Up @@ -270,17 +270,17 @@ async def upload_invoice(

try:

queue = batch_processor.get_total_docs()
logger.debug(f'number of docs in queue {queue}')
if batch_processor.get_total_docs() >= config['batch_processor']['queue_size']: # Adjust queue size limit as needed
msg = {
"status": "error",
"message": "Server is currently processing too many documents. Please try again later."
}
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content=msg
)
# queue = batch_processor.get_total_docs()
# logger.debug(f'number of docs in queue {queue}')
# if batch_processor.get_total_docs() >= config['batch_processor']['queue_size']: # Adjust queue size limit as needed
# msg = {
# "status": "error",
# "message": "Server is currently processing too many documents. Please try again later."
# }
# return JSONResponse(
# status_code=status.HTTP_429_TOO_MANY_REQUESTS,
# content=msg
# )

# Parse JSON body
body = await request.json()
Expand Down

0 comments on commit 7755a21

Please sign in to comment.