Package provides abstractions to create RabbitMQ async worker microservices.
AsyncWorker
provides general working module, that should be used as entry point for the entire application.
It's responsibility is to initialize RabbitMQ consuming connection and run the incoming event's loop.
In order to use it, the worker should be:
-
Initialized:
worker = AsyncWorker(**initialization_params)
-
Started:
worker.start(worker, handler)
connection_parameters
- any availablepika
's connection parameters in order to connect to RabbitMQexchange_params
- dict {str: str}, which keys and values are available to be used insidepika
's exchange_declare method callqueue_params
- dict {str: str}, which keys and values are available to be used insidepika
's queue_devlarerouting_key
- string to be used as routing key to bind queue to exchangeprefetch_count
- integer, defines prefetch_count for established connection.ack_mode
- one from theAckMode
enum. Can be acked before request is got, or after result is sent.
Worker
should be specified as a static class - with @classmethod
perform with next signature:
preform :: (cls, msg: bytes) -> bytes
Worker can perform task in syncronos manner with thread blokings, because for each worker's perform
call
new thread will be started.
Abstract class, that defines the contract should be implemented for each concrete realization of the response handler.
Concrete realization for the response handler - that sends responses back to RabbitMQ.
import pika
import logging
import time
from rmq_async_worker import AsyncWorker, RmqResponseHandler, AckMode
class Worker:
@classmethod
def perform(self, msg):
time.sleep(10)
logging.info(msg)
return b'result'
def main():
logging.basicConfig(
level=logging.INFO,
format='%(levelname) -10s %(asctime)s %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s'
)
AsyncWorker(
connection_parameters=pika.URLParameters('amqp://guest:guest@localhost:5672/%2F'),
exchange_params={
'exchange': 'test_exchange',
'exchange_type': 'direct',
'passive': False,
'durable': True,
'auto_delete': False
},
queue_params={
'queue': "standard",
'auto_delete': False
},
routing_key='standard_key',
prefetch_count=2,
ack_mode=AckMode.ON_RESPOND
).start(
Worker,
RmqResponseHandler(
connection_parameters=pika.URLParameters('amqp://guest:guest@localhost:5672/%2F'),
exchange_params={
'exchange': 'test_exchange',
'exchange_type': 'direct',
'passive': False,
'durable': True,
'auto_delete': False
},
routing_key='resp_key',
)
)
if __name__ == '__main__':
main()