Skip to content

Latest commit

 

History

History
175 lines (130 loc) · 6 KB

message_processor.md

File metadata and controls

175 lines (130 loc) · 6 KB

Message processor

Basics

The message processor is an object that actually process the message and must return a result status. Here's example:

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        $this->mailer->send('[email protected]', $message->getBody());
        
        return self::ACK;
    }
}

By returning self::ACK a processor tells a broker that the message has been processed correctly.

There are other statuses:

  • self::ACK - Use this constant when the message is processed successfully and the message could be removed from the queue.
  • self::REJECT - Use this constant when the message is not valid or could not be processed. The message is removed from the queue.
  • self::REQUEUE - Use this constant when the message is not valid or could not be processed right now but we can try again later

Look at the next example that shows the message validation before sending a mail. If the message is not valid a processor rejects it.

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;
use Enqueue\Util\JSON;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        $data = JSON::decode($message->getBody());
        if ($user  = $this->userRepository->find($data['userId'])) {
            return self::REJECT;
        }
        
        $this->mailer->send($user->getEmail(), $data['text']);
        
        return self::ACK;
    }
}

It is possible to find out whether the message failed previously or not. There is isRedelivered method for that. If it returns true than there was attempt to process message.

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        if ($message->isRedelivered()) {
            return self::REQUEUE;
        }
        
        $this->mailer->send('[email protected]', $message->getBody());
        
        return self::ACK;
    }
}

The second argument is your context. You can use it to send messages to other queues\topics.

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        $this->mailer->send('[email protected]', $message->getBody());
        
        $queue = $context->createQueue('anotherQueue');
        $message = $context->createMessage('Message has been sent');
        $context->createProducer()->send($queue, $message);
        
        return self::ACK;
    }
}

Reply result

The consumption component provide some useful extensions, for example there is an extension that makes RPC processing simpler. The producer might wait for a reply from a consumer and in order to send it a processor has to return a reply result. Don't forget to add ReplyExtension.

<?php
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\Result;

class SendMailProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $context) 
    {
        $this->mailer->send('[email protected]', $message->getBody());
        
        $replyMessage = $context->createMessage('Message has been sent');
        
        return Result::reply($replyMessage);
    }
}

/** @var \Interop\Queue\PsrContext $psrContext */

$queueConsumer = new QueueConsumer($psrContext, new ChainExtension([
    new ReplyExtension()
]));

$queueConsumer->bind('foo', new SendMailProcessor());

$queueConsumer->consume();

On exceptions

It is advised to not catch exceptions and fail fast. Also consider using supervisord or similar process manager to restart exited consumers.

Despite advising to fail there are some cases where you might want to catch exceptions.

  • A message validator throws an exception on invalid message. It is better to catch it and return REJECT.
  • Some transports (Doctrine DBAL, Filesystem, Redis) does notice an error, and therefor won't be able to redeliver the message. The message is completely lost. You might want to catch an exception to properly redelivery\requeue the message.

Examples

Feel free to contribute your own.

back to index