diff --git a/runtime/chalicelib/worker/__init__.py b/runtime/chalicelib/worker/__init__.py index b53bd19..90e8320 100644 --- a/runtime/chalicelib/worker/__init__.py +++ b/runtime/chalicelib/worker/__init__.py @@ -7,18 +7,11 @@ import chalicelib.config as config_module import chalicelib.util.import_util as import_util -if typing.TYPE_CHECKING: - import mypy_boto3_sqs.type_defs - - type RecordType = "mypy_boto3_sqs.type_defs.MessageTypeDef" -else: - type RecordType = dict[str, typing.Any] - -SQSEventType = typing.TypedDict("SQSEventType", {"Records": list[RecordType]}) -WorkerType = typing.Callable[[RecordType], dict[str, typing.Any]] +WorkerType = typing.Callable[[chalice.app.SQSRecord], dict[str, typing.Any]] logger = logging.getLogger(__name__) workers: dict[str, WorkerType] = {} +worker_handler_blueprint = chalice.app.Blueprint(__name__) for _workers in typing.cast( list[list[WorkerType]], @@ -31,15 +24,12 @@ workers.update({worker.__name__: worker for worker in _workers}) -def _sqs_handler(event: chalice.app.SQSEvent) -> list[dict[str, typing.Any]]: - parsed_event: SQSEventType = event.to_dict() - logger.info(f"{parsed_event=}") - +@worker_handler_blueprint.on_sqs_message(queue=config_module.config.infra.queue_name) +def sqs_handler(event: chalice.app.SQSEvent) -> list[dict[str, typing.Any]]: results: list[dict[str, typing.Any]] = [] - for record in parsed_event["Records"]: + for record in event: try: - worker_name = json.loads(record["body"])["worker"] - results.append(workers[worker_name](record)) + results.append(workers[json.loads(record.body)["worker"]](record)) except Exception as e: logger.error(f"Failed to handle event: {record}", exc_info=e) results.append({"error": "Failed to handle event"}) @@ -49,4 +39,4 @@ def _sqs_handler(event: chalice.app.SQSEvent) -> list[dict[str, typing.Any]]: def register_worker(app: chalice.app.Chalice) -> None: - app.on_sqs_message(queue=config_module.config.infra.queue_name)(_sqs_handler) + app.register_blueprint(worker_handler_blueprint) diff --git a/runtime/chalicelib/worker/notification_sender.py b/runtime/chalicelib/worker/notification_sender.py index c6d55ea..239c5ab 100644 --- a/runtime/chalicelib/worker/notification_sender.py +++ b/runtime/chalicelib/worker/notification_sender.py @@ -1,17 +1,11 @@ import functools import typing +import chalice.app import chalicelib.send_manager as send_manager import chalicelib.send_manager.__interface__ as send_mgr_interface import pydantic -if typing.TYPE_CHECKING: - import mypy_boto3_sqs.type_defs - - type RecordType = "mypy_boto3_sqs.type_defs.MessageTypeDef" -else: - type RecordType = dict[str, typing.Any] - class WorkerPayload(pydantic.BaseModel): sender_type: str @@ -41,8 +35,13 @@ def send(self) -> dict[str, str]: return self.send_manager.send(self.send_request_payload) -def notification_sender(record: RecordType) -> dict[str, str]: - return WorkerPayload.model_validate_json(record["body"]).send() +class SQSRecordBody(pydantic.BaseModel): + worker: str + worker_payload: WorkerPayload + + +def notification_sender(record: chalice.app.SQSRecord) -> dict[str, str]: + return SQSRecordBody.model_validate_json(record.body).worker_payload.send() workers = [notification_sender] diff --git a/runtime/chalicelib/worker/test_worker.py b/runtime/chalicelib/worker/test_worker.py index 243de21..10e2c96 100644 --- a/runtime/chalicelib/worker/test_worker.py +++ b/runtime/chalicelib/worker/test_worker.py @@ -1,15 +1,10 @@ -import typing +import json -if typing.TYPE_CHECKING: - import mypy_boto3_sqs.type_defs +import chalice.app - type RecordType = "mypy_boto3_sqs.type_defs.MessageTypeDef" -else: - type RecordType = dict[str, typing.Any] - -def test_handler(record: RecordType) -> RecordType: - print(record) +def test_handler(record: chalice.app.SQSRecord) -> chalice.app.SQSRecord: + print(record.to_dict(), json.loads(record.body)) return record