Skip to content

Commit

Permalink
reduce delay between batch on FE
Browse files Browse the repository at this point in the history
  • Loading branch information
mrzaizai2k committed Nov 12, 2024
1 parent 460b7e5 commit 107c39c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 158 deletions.
10 changes: 5 additions & 5 deletions jwt-auth-frontend/src/components/AddInovice/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ function AddInvoice({ username }) {
return;
}

const MAX_BATCH_SIZE = 5;
const MAX_BATCH_SIZE = 10;
const RETRY_LIMIT = 3;
let currentBatchIndex = 0;

Expand Down Expand Up @@ -205,10 +205,9 @@ function AddInvoice({ username }) {
setUploadProgress(progress);

currentBatchIndex += MAX_BATCH_SIZE;

if (currentBatchIndex < selectedFiles.length) {
await new Promise(resolve => setTimeout(resolve, 60000));
}

// Add 0.5-second delay between each batch
await new Promise(resolve => setTimeout(resolve, 500));
}

notification.destroy();
Expand All @@ -234,6 +233,7 @@ function AddInvoice({ username }) {
}
};


const deleteImage = (imageToDelete) => {
const indexToDelete = images.indexOf(imageToDelete);
setImages(prevImages => prevImages.filter(image => image !== imageToDelete));
Expand Down
177 changes: 24 additions & 153 deletions src/Utils/process_documents_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import sys
sys.path.append("")

import queue
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List
import concurrent.futures

from typing import Dict
from src.egw_export import export_egw_file
from src.export_excel.main import export_json_to_excel
from src.Utils.utils import find_pairs_of_docs
Expand Down Expand Up @@ -79,152 +75,27 @@ def get_excel_files(mongo_db, start_of_month, updated_doc, logger):
return employee_expense_report_path, output_2_excel


class BatchProcessor:
def __init__(self, ocr_reader, invoice_extractor,
mongo_db, config: dict, email_sender, logger=None):

self.ocr_reader = ocr_reader
self.invoice_extractor = invoice_extractor
self.mongo_db = mongo_db
self.config = config
self.email_sender = email_sender
self.logger = logger

# Use a maximum queue size to prevent memory issues
self.process_queue = queue.Queue(maxsize=self.config['batch_processor']["queue_size"]) # Limit queue size
self.batch_size = self.config['batch_processor']["batch_size"]
self.processing_interval = self.config['batch_processor']["processing_interval"]
self.processing_thread = None
self.is_running = False
self.last_processed_time = datetime.now()
self.currently_processing = set()
self.queued_documents = set() # Track queued document IDs
self.lock = threading.Lock()
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.batch_size)

def start(self):
if not self.is_running:
self.is_running = True
self.processing_thread = threading.Thread(target=self._process_queue)
self.processing_thread.daemon = True
self.processing_thread.start()

def stop(self):
self.is_running = False
if self.processing_thread:
self.processing_thread.join()
self.executor.shutdown(wait=True)

def add_to_queue(self, document: Dict) -> bool:
"""Add document to queue if not already queued. Returns True if added, False if skipped."""
def process_single_document(ocr_reader, invoice_extractor,
config, mongo_db, logger, document: dict):
try:
document_id = document['_id']

with self.lock:
# Skip if document is already queued or being processed
if document_id in self.queued_documents or document_id in self.currently_processing:
return False

try:
# Try to add to queue with a timeout to prevent blocking
self.process_queue.put(document, timeout=1)
self.queued_documents.add(document_id)
return True
except queue.Full:
return False

def _process_queue(self):
while self.is_running:
try:
# Process documents one at a time, but maintain concurrent execution
current_batch_size = 0

while current_batch_size < self.batch_size and self.is_running:
try:
# Try to get a document with a 1-second timeout
document = self.process_queue.get(timeout=1)

# Only process if not already processing
if document['_id'] not in self.currently_processing:
with self.lock:
self.currently_processing.add(document['_id'])

# Submit to thread pool and increment batch size
self.executor.submit(self._process_single_document, document)
current_batch_size += 1

except queue.Empty:
# If we have processed anything and enough time has passed, break
if current_batch_size > 0 and (datetime.now() - self.last_processed_time).seconds >= self.processing_interval:
break
continue

# Update last processed time if we processed anything
if current_batch_size > 0:
self.last_processed_time = datetime.now()

# Check if queue is empty and send email if needed
if self.process_queue.empty():
self.email_sender.send_email(
email_type='modify_invoice_remind',
receivers=None
)

# Small sleep to prevent tight loop
time.sleep(0.1)

except Exception as e:
if self.logger:
self.logger.error(f"Error in batch processing: {str(e)}")
time.sleep(5)

def _process_single_document(self, document: Dict):
try:
document_id = document['_id']
base64_img = document['invoice_image_base64']
file_name = document['file_name']

new_data = extract_invoice_info(
base64_img=base64_img,
ocr_reader=self.ocr_reader,
invoice_extractor=self.invoice_extractor,
config=self.config,
logger=self.logger,
file_name=file_name
)

self.mongo_db.update_document_by_id(str(document_id), new_data)

except Exception as e:
if self.logger:
self.logger.error(f"Error processing document {document_id}: {str(e)}")
finally:
with self.lock:
self.currently_processing.remove(document_id)
self.queued_documents.remove(document_id)

def process_single_document(self, document: Dict):
try:
document_id = document['_id']
base64_img = document['invoice_image_base64']
file_name = document['file_name']

new_data = extract_invoice_info(
base64_img=base64_img,
ocr_reader=self.ocr_reader,
invoice_extractor=self.invoice_extractor,
config=self.config,
logger=self.logger,
file_name=file_name
)

self.mongo_db.update_document_by_id(str(document_id), new_data)

except Exception as e:
if self.logger:
self.logger.error(f"Error processing document {document_id}: {str(e)}")

base64_img = document['invoice_image_base64']
file_name = document['file_name']

new_data = extract_invoice_info(
base64_img=base64_img,
ocr_reader=ocr_reader,
invoice_extractor=invoice_extractor,
config=config,
logger=logger,
file_name=file_name
)

mongo_db.update_document_by_id(str(document_id), new_data)

except Exception as e:
msg = f"Error processing document {document_id}: {str(e)}"
print(msg)
if logger:
logger.error(msg = msg)

def get_total_docs(self) -> int:
"""Returns the total number of documents in the queue plus those currently processing."""
with self.lock:
return self.process_queue.qsize() + len(self.currently_processing)

0 comments on commit 107c39c

Please sign in to comment.