Skip to content

Commit

Permalink
re add rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
mrzaizai2k committed Nov 12, 2024
1 parent 7755a21 commit 460b7e5
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 31 deletions.
7 changes: 3 additions & 4 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ fixed_line_titles:
- Toll
- Parking fees

batch_processor:
queue_size: 3
batch_size: 1 # Process batch_size documents at a time
processing_interval: 60 # Wait at least processing_interval seconds between batches

rate_limit:
max_files_per_min: 30
58 changes: 31 additions & 27 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
from src.invoice_extraction import validate_invoice
from src.Utils.logger import create_logger
from src.mail import EmailSender
from src.Utils.process_documents_utils import get_egw_file, get_excel_files, BatchProcessor
from src.Utils.process_documents_utils import get_egw_file, get_excel_files, process_single_document
from src.rate_limiter import RateLimiter


from dotenv import load_dotenv
load_dotenv()
Expand All @@ -54,42 +56,32 @@

email_sender = EmailSender(config=config, logger=logger)

batch_processor = BatchProcessor(
ocr_reader=ocr_reader,
invoice_extractor=invoice_extractor,
config=config,
email_sender=email_sender,
mongo_db=mongo_db,
logger=logger,
)

def process_in_batches(batch_size=2):
while True:
# Get the next batch of "not extracted" documents, up to the batch size
documents, _ = mongo_db.get_documents(filters={"status": "not extracted"}, limit=batch_size)

# If no documents are left, exit the loop
if not documents:
break

# Process each document one at a time
for document in documents:
batch_processor.process_single_document(document)

max_files_per_min = config['rate_limit']['max_files_per_min']
rate_limiter = RateLimiter(max_files_per_min)

def process_change_stream(config):
global change_stream
batch_processor.start()

for change in change_stream:
if change['operationType'] == 'insert':
# Get the current count of "not extracted" documents
_, total_matching_docs = mongo_db.get_documents(filters={"status": "not extracted"})

# Only proceed if there are fewer than 3 "not extracted" documents
if total_matching_docs < 3:
# Start processing in batches of 3
process_in_batches(batch_size=2)
if total_matching_docs > 3:
continue
# Start processing in batches of 3
while True:
documents, _ = mongo_db.get_documents(filters={"status": "not extracted"})

# If no documents are left, exit the loop
if not documents:
break

# Process each document one at a time
for document in documents:
process_single_document(ocr_reader=ocr_reader, invoice_extractor=invoice_extractor,
config=config, mongo_db=mongo_db, logger=logger, document=document)


elif change['operationType'] == 'update':
Expand Down Expand Up @@ -191,6 +183,18 @@ async def hello():
return {"message": "Hello, world!"}


# Apply middleware for rate limiting
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
if request.url.path == "/api/v1/invoices/upload":
if not await rate_limiter.is_allowed():
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"status": "error", "message": "Rate limit exceeded. Try again later."}
)
return await call_next(request)


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
try:
Expand Down
23 changes: 23 additions & 0 deletions src/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import sys
sys.path.append("")

from asyncio import Lock
import time

class RateLimiter:
def __init__(self, max_requests_per_min):
self.max_requests = max_requests_per_min
self.request_times = []
self.lock = Lock()

async def is_allowed(self) -> bool:
async with self.lock:
current_time = time.time()
# Remove requests older than 1 minute
self.request_times = [t for t in self.request_times if current_time - t < 60]
# Check if rate limit exceeded
if len(self.request_times) >= self.max_requests:
return False
# Record current request time
self.request_times.append(current_time)
return True

0 comments on commit 460b7e5

Please sign in to comment.