Skip to content

Commit

Permalink
docs: Update docs wrapping, and add more examples
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorGaiva committed Mar 4, 2024
1 parent 6d3a87a commit fa80d34
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 42 deletions.
8 changes: 7 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
{
"editor.formatOnSave": true
"editor.formatOnSave": true,
"editor.wordWrapColumn": 100,
"editor.wrappingIndent": "deepIndent",
"editor.wordWrap": "on",
"editor.rulers": [
100
]
}
110 changes: 69 additions & 41 deletions lib/consumer/consumer.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule RabbitMQStream.Consumer do
@moduledoc """
Used to declare a Persistent Consumer module. It is able to process
chunks by implementing the `handle_chunk/1` or `handle_chunk/2` callbacks.
Used to declare a Persistent Consumer module. It is able to process chunks by implementing the
`handle_chunk/1` or `handle_chunk/2` callbacks.
# Usage
Expand Down Expand Up @@ -29,7 +29,9 @@ defmodule RabbitMQStream.Consumer do
* `:offset_reference` -
* `:private` - Private data that can hold any value, and is passed to the `handle_chunk/2` callback.
* `:serializer` - The module to use to decode the message. Defaults to `__MODULE__`,
which means that the consumer will use the `decode!/1` callback to decode the message, which is implemented by default to return the message as is.
which means that the consumer will use the `decode!/1` callback to decode the message, which is
implemented by default to return the message as is.
* `:properties` - Define the properties of the subscription. Can only have one option at a time.
* `:single_active_consumer`: set to `true` to enable [single active consumer](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/) for this subscription.
Expand All @@ -40,53 +42,72 @@ defmodule RabbitMQStream.Consumer do
# Offset Tracking
The consumer is able to track its progress in the stream by storing its
latests offset in the stream. Check [Offset Tracking with RabbitMQ Streams(https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/) for more information on
how offset tracking works.
The consumer is able to track its progress in the stream by storing its latests offset in the stream.
Check [Offset Tracking with RabbitMQ Streams(https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/) for more information on how offset tracking works.
The consumer can be configured to use different offset tracking strategies, which decide when to
store the offset in the stream. You can implement your own strategy by implementing the
`RabbitMQStream.Consumer.OffsetTracking` behaviour, and passing it to the `:offset_tracking` option.
It defaults to `RabbitMQStream.Consumer.OffsetTracking.CountStrategy`, which stores the offset after,
by default, every 50_000 messages.
You can use the default strategies by passing a shorthand alias:
* `interval` : `RabbitMQStream.Consumer.OffsetTracking.IntervalStrategy`
* `count` : `RabbitMQStream.Consumer.OffsetTracking.CountStrategy`
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first,
offset_tracking: [
count: [store_after: 50],
interval: [store_after: 5_000]
]
The consumer can be configured to use different offset tracking strategies,
which decide when to store the offset in the stream. You can implement your
own strategy by implementing the `RabbitMQStream.Consumer.OffsetTracking`
behaviour, and passing it to the `:offset_tracking` option. It defaults to
`RabbitMQStream.Consumer.OffsetTracking.CountStrategy`, which stores the
offset after, by default, every 50_000 messages.
# Flow Control
The RabbitMQ Streams server requires that the consumer declares how many
messages it is able to process at a time. This is done by informing an amount
of 'credits' to the server. After every chunk is sent, one credit is consumed,
and the server will send messages only if there are credits available.
The RabbitMQ Streams server requires that the consumer declares how many messages it is able to
process at a time. This is done by informing an amount of 'credits' to the server. After every chunk
is sent, one credit is consumed, and the server will send messages only if there are credits available.
We can configure the consumer to automatically request more credits based on a strategy.
By default it uses the `RabbitMQStream.Consumer.FlowControl.MessageCount`, which
requests 1 additional credit for every 1 processed chunk. Please check the
RabbitMQStream.Consumer.FlowControl.MessageCount module for more information.
We can configure the consumer to automatically request more credits based on
a strategy. By default it uses the `RabbitMQStream.Consumer.FlowControl.MessageCount`,
which requests 1 additional credit for every 1 processed chunk. Please check
the RabbitMQStream.Consumer.FlowControl.MessageCount module for more information.
You can also call `RabbitMQStream.Consumer.credit/2` to manually add more credits to the subscription,

Check warning on line 82 in lib/consumer/consumer.ex

View workflow job for this annotation

GitHub Actions / docs

documentation references function "RabbitMQStream.Consumer.credit/2" but it is undefined or private
or implement your own strategy by implementing the `RabbitMQStream.Consumer.FlowControl` behaviour,
and passing it to the `:flow_control` option.
You can also call `RabbitMQStream.Consumer.credit/2` to manually add more
credits to the subscription, or implement your own strategy by implementing
the `RabbitMQStream.Consumer.FlowControl` behaviour, and passing
it to the `:flow_control` option.
You can find more information on the [RabbitMQ Streams documentation](https://www.rabbitmq.com/stream.html#flow-control).
If you want an external process to be fully in control of the flow control
of a consumer, you can set the `:flow_control` option to `false`. Then
you can call `RabbitMQStream.Consumer.credit/2` to manually add more
credits to the subscription.
If you want an external process to be fully in control of the flow control of a consumer, you can
set the `:flow_control` option to `false`. Then you can call `RabbitMQStream.Consumer.credit/2` to

Check warning on line 91 in lib/consumer/consumer.ex

View workflow job for this annotation

GitHub Actions / docs

documentation references function "RabbitMQStream.Consumer.credit/2" but it is undefined or private
manually add more credits to the subscription.
## Commited Offset
As of RabbitMQ 3.13, each deliver message now has the `commited_offset`. This respresents the 'next' offset, instead of the 'last'.
As of RabbitMQ 3.13, each deliver message now has the `commited_offset`. This respresents the 'next'
offset, instead of the 'last'.
Before 3.13, using the next offset meant that the tracking was always "off by one", meaning that everytime a Consumer started, it would
it would always receive the last message it read once again.
Before 3.13, using the next offset meant that the tracking was always "off by one", meaning that
everytime a Consumer started, it would it would always receive the last message it read once again.
As of 3.13 and forward, the tracking now prefers to use the `commited_offset` when storing it to the stream, meaning that if it has received a message
and then the offset is stored, it would not receive it again.
As of 3.13 and forward, the tracking now prefers to use the `commited_offset` when storing it to the
stream, meaning that if it has received a message and then the offset is stored, it would not receive it again.
# Configuration
You can configure each consumer with:
config :rabbitmq_stream, MyApp.MyConsumer,
Expand All @@ -98,8 +119,9 @@ defmodule RabbitMQStream.Consumer do
flow_control: [count: [credit_after: {:count, 1}]],
serializer: Jason
These options are overriden by the options passed to the `use` macro, which
are overriden by the options passed to `start_link/1`.
These options are overriden by the options passed to the `use` macro, which are overriden by the
options passed to `start_link/1`.
And also you can override the defaults of all consumers with:
Expand All @@ -118,6 +140,7 @@ defmodule RabbitMQStream.Consumer do
# Decoding
You can declare a function for decoding each message by declaring a `decode!/1` callback as follows:
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
Expand All @@ -131,6 +154,7 @@ defmodule RabbitMQStream.Consumer do
end
end
Or by passing a `:serializer` option to the `use` macro:
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
Expand All @@ -140,19 +164,23 @@ defmodule RabbitMQStream.Consumer do
serializer: Jason
end
The default value for the `:serializer` is the module itself, unless a default is defined at a higher level of the
configuration. If there is a `decode!/1` callback defined, it is always used
The default value for the `:serializer` is the module itself, unless a default is defined at a higher
level of the configuration. If there is a `decode!/1` callback defined, it is always used
# Properties
You can provide additional properties to the consumer to change its behavior, by passing `:properties`.
## Single active consumer
To use it, you must provide a "group_name". The server manages each consumer so the only one will of each group
will be receiving chunks at a time.
To use it, you must provide a "group_name". The server manages each consumer so the only one will
of each group will be receiving chunks at a time.
Although there is only one Consumer active, we must provide the server the offset a consumer starts on when being upgraded
to the being the active one. To do so you must implement the `handle_update/2` callback, which must return a `{:ok, offset}` tuple.
Although there is only one Consumer active, we must provide the server the offset a consumer starts
on when being upgraded to the being the active one. To do so you must implement the `handle_update/2`
callback, which must return a `{:ok, offset}` tuple.
@impl true
def handle_update(_, :upgrade) do
Expand Down

0 comments on commit fa80d34

Please sign in to comment.