Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow prefetch_count when consuming a channel. #27

Merged
merged 1 commit into from
Sep 26, 2019

Conversation

jdl
Copy link
Contributor

@jdl jdl commented May 31, 2019

When starting up a consumer process this adds support for setting a
prefetch on the channel.

For example:

children = [
  {EchoConsumer, [pool_id: :consumers_pool, queue: "echo_queue", options: [prefetch_count: 1]]},

When starting up a consumer process this adds support for setting a
prefetch on the channel.

For example:

    children = [
      {EchoConsumer, [pool_id: :consumers_pool, queue: "echo_queue", options: [prefetch_count: 1]]},
@jdl
Copy link
Contributor Author

jdl commented May 31, 2019

I wasn't trying to make a PR into esl:master quite yet, but I guess I'll use this to ask.

I wanted to support setting a prefetch_count on channels for my consumers. I was having a difficult time figuring out how to properly test this, since there seems to be no way via AMQP to fetch the settings of a channel. You can see that they are changed in the management console, but doing it from within the test has proven difficult.

As far as setting the prefetch_count is this even the right approach? It appears to actually work, but I'm not sure if I've just hacked my way around a better approach.

@sescobb27 sescobb27 requested review from a user, sescobb27 and Ayanda-D May 31, 2019 18:30
@sescobb27
Copy link
Contributor

Hi, i don't think that logic should be inside the adapter. if we want to set up a channel before starting to consume messages maybe we can add some callbacks to the consumer behavior like setup_channel in which we pass the channel to that function after we got a channel from the pool, then the user can call any logic they want in that function before even calling consume thoughts @bryanhuntesl @Ayanda-D

@jdl
Copy link
Contributor Author

jdl commented Jun 3, 2019

Are you thinking that this callback would belong in the :ok version of Consumer.handle_channel_checkout/2?

I'm certainly willing to work on this. I'm not sure how best to test something like this without introducing mocks however.

@sescobb27
Copy link
Contributor

@jdl yes, in the :ok branch of that function, i think what we can test this feature by sending 2 messages and make the prefetch_count equal to 2, so the 2 messages should be fetched and we can assert on both, but before starting with the implementation lets wait for @bryanhuntesl and/or @Ayanda-D so we all agree this would be the right approach

@Ayanda-D
Copy link
Contributor

Ayanda-D commented Jun 3, 2019

Supporting prefetch sounds good. I think an approach of having a qos callback extension in the adapter behaviour would be good. So the checkout'ed channel's prefetch would be set via the adapter. Then also grant the option/possibility of the internal consumer to make use of a configured prefetch

@Ayanda-D
Copy link
Contributor

Ayanda-D commented Jun 3, 2019

Then (as discussed), setup_queue.ex and setup_channel.ex could be consolidated to "functions" in the/a same setup_rabbit.ex module. setup_channel can then abstract the qos calls to carryout prefetch. Please also add option/support to configure whether prefetch is global or not[1] 👍

[1] https://www.rabbitmq.com/consumer-prefetch.html

@ghost
Copy link

ghost commented Jun 3, 2019

Agree with @sescobb27 - it would seem to be more appropriate to follow the underlying semantics of pma/amqp and handle the following in separate code blocks:

  • create the channel
  • set the QOS options
  • consume

@sescobb27 testing strategy seems appropriate to me.

@Ayanda-D
Copy link
Contributor

Ayanda-D commented Jun 3, 2019

ok, bear in mind the number of configured channels are created implicitly by the connection "worker" on initialisation https://github.com/esl/ex_rabbit_pool/blob/master/lib/worker/rabbit_connection.ex#L194-L196 so setting QoS once on checkout (when necessary as this ain't a mandatory operation), would make more sense than incurring the overhead of doing this each time on channel creation.

The adapter behaviour already defines and abstracts interactions with Rabbit over pma/amqp lib, so keeping QoS functionality in there should be the approach (separate module here would confuse things).

Then for the setup procedures, the fabric is configured in the config.exs which is only setup once. Multiple modules for "once off" initialisation steps could be unnecessary here - besides, we want to be able to support different messaging platforms as well, mentioned in #28 - so I think its best we implement these in a single module setup_rabbit.ex module (or if you really want this separate, which I still find unnecessary for once-off procedures, these can be under the same namespace/directory of the lib, e.g. /rabbit/setup_queue.ex ). And as mentioned these need not be GenServer but static API modules - we aren't retaining any state on fabric initialisation

@jdl
Copy link
Contributor Author

jdl commented Jun 11, 2019

I don't pretend to understand everything in the last reply, but does this seem like a consensus on what should be changed?

@ghost
Copy link

ghost commented Jun 11, 2019

Agree with @sescobb27 - it would seem to be more appropriate to follow the underlying semantics of pma/amqp and handle the following in separate code blocks:

* create the channel

* set the QOS options

* consume

@sescobb27 testing strategy seems appropriate to me.

@sescobb27 - what would be the recommended strategy for users/implementors?

Copy link

@ghost ghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting for feedback from @sescobb27 - might be it needs to be implemented a little differently.

@sescobb27
Copy link
Contributor

add a qos/2 function to the behaviour, then the adapter should implement it and call the amqp function, then in handle_channel_checkout in the consumer implementation, before starting to consume messages call a callback function that can be overridden by the user something like setup_channel(channel) that returns :ok | {:error, error} (the same as the qos/2 function), the default implementation should return :ok if the operation was successful consume, else continue with the retry logic

with :ok <- setup_channel(channel),
     {:ok, tag} <- adapter.consume(...) do
  ... # continue
else
  {:error, error} -> # retry logic
end

thoughts? @bryanhuntesl

@sescobb27 sescobb27 merged commit ce03aa3 into esl:master Sep 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants