Skip to content

Commit

Permalink
Fix sqs handler type mismatch issues
Browse files Browse the repository at this point in the history
  • Loading branch information
MU-Software committed Dec 15, 2024
1 parent 32aab17 commit 1cbd3f6
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 35 deletions.
24 changes: 7 additions & 17 deletions runtime/chalicelib/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,11 @@
import chalicelib.config as config_module
import chalicelib.util.import_util as import_util

if typing.TYPE_CHECKING:
import mypy_boto3_sqs.type_defs

type RecordType = "mypy_boto3_sqs.type_defs.MessageTypeDef"
else:
type RecordType = dict[str, typing.Any]

SQSEventType = typing.TypedDict("SQSEventType", {"Records": list[RecordType]})
WorkerType = typing.Callable[[RecordType], dict[str, typing.Any]]
WorkerType = typing.Callable[[chalice.app.SQSRecord], dict[str, typing.Any]]

logger = logging.getLogger(__name__)
workers: dict[str, WorkerType] = {}
worker_handler_blueprint = chalice.app.Blueprint(__name__)

for _workers in typing.cast(
list[list[WorkerType]],
Expand All @@ -31,15 +24,12 @@
workers.update({worker.__name__: worker for worker in _workers})


def _sqs_handler(event: chalice.app.SQSEvent) -> list[dict[str, typing.Any]]:
parsed_event: SQSEventType = event.to_dict()
logger.info(f"{parsed_event=}")

@worker_handler_blueprint.on_sqs_message(queue=config_module.config.infra.queue_name)
def sqs_handler(event: chalice.app.SQSEvent) -> list[dict[str, typing.Any]]:
results: list[dict[str, typing.Any]] = []
for record in parsed_event["Records"]:
for record in event:
try:
worker_name = json.loads(record["body"])["worker"]
results.append(workers[worker_name](record))
results.append(workers[json.loads(record.body)["worker"]](record))
except Exception as e:
logger.error(f"Failed to handle event: {record}", exc_info=e)
results.append({"error": "Failed to handle event"})
Expand All @@ -49,4 +39,4 @@ def _sqs_handler(event: chalice.app.SQSEvent) -> list[dict[str, typing.Any]]:


def register_worker(app: chalice.app.Chalice) -> None:
app.on_sqs_message(queue=config_module.config.infra.queue_name)(_sqs_handler)
app.register_blueprint(worker_handler_blueprint)
17 changes: 8 additions & 9 deletions runtime/chalicelib/worker/notification_sender.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import functools
import typing

import chalice.app
import chalicelib.send_manager as send_manager
import chalicelib.send_manager.__interface__ as send_mgr_interface
import pydantic

if typing.TYPE_CHECKING:
import mypy_boto3_sqs.type_defs

type RecordType = "mypy_boto3_sqs.type_defs.MessageTypeDef"
else:
type RecordType = dict[str, typing.Any]


class WorkerPayload(pydantic.BaseModel):
sender_type: str
Expand Down Expand Up @@ -41,8 +35,13 @@ def send(self) -> dict[str, str]:
return self.send_manager.send(self.send_request_payload)


def notification_sender(record: RecordType) -> dict[str, str]:
return WorkerPayload.model_validate_json(record["body"]).send()
class SQSRecordBody(pydantic.BaseModel):
worker: str
worker_payload: WorkerPayload


def notification_sender(record: chalice.app.SQSRecord) -> dict[str, str]:
return SQSRecordBody.model_validate_json(record.body).worker_payload.send()


workers = [notification_sender]
13 changes: 4 additions & 9 deletions runtime/chalicelib/worker/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import typing
import json

if typing.TYPE_CHECKING:
import mypy_boto3_sqs.type_defs
import chalice.app

type RecordType = "mypy_boto3_sqs.type_defs.MessageTypeDef"
else:
type RecordType = dict[str, typing.Any]


def test_handler(record: RecordType) -> RecordType:
print(record)
def test_handler(record: chalice.app.SQSRecord) -> chalice.app.SQSRecord:
print(record.to_dict(), json.loads(record.body))
return record


Expand Down

0 comments on commit 1cbd3f6

Please sign in to comment.