diff --git a/.vscode/settings.json b/.vscode/settings.json index cac0e10..e8810b7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,9 @@ { - "editor.formatOnSave": true + "editor.formatOnSave": true, + "editor.wordWrapColumn": 100, + "editor.wrappingIndent": "deepIndent", + "editor.wordWrap": "on", + "editor.rulers": [ + 100 + ] } \ No newline at end of file diff --git a/lib/consumer/consumer.ex b/lib/consumer/consumer.ex index 9defb17..808d327 100644 --- a/lib/consumer/consumer.ex +++ b/lib/consumer/consumer.ex @@ -1,7 +1,7 @@ defmodule RabbitMQStream.Consumer do @moduledoc """ - Used to declare a Persistent Consumer module. It is able to process - chunks by implementing the `handle_chunk/1` or `handle_chunk/2` callbacks. + Used to declare a Persistent Consumer module. It is able to process chunks by implementing the + `handle_chunk/1` or `handle_chunk/2` callbacks. # Usage @@ -29,7 +29,9 @@ defmodule RabbitMQStream.Consumer do * `:offset_reference` - * `:private` - Private data that can hold any value, and is passed to the `handle_chunk/2` callback. * `:serializer` - The module to use to decode the message. Defaults to `__MODULE__`, - which means that the consumer will use the `decode!/1` callback to decode the message, which is implemented by default to return the message as is. + which means that the consumer will use the `decode!/1` callback to decode the message, which is + implemented by default to return the message as is. + * `:properties` - Define the properties of the subscription. Can only have one option at a time. * `:single_active_consumer`: set to `true` to enable [single active consumer](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/) for this subscription. @@ -40,53 +42,72 @@ defmodule RabbitMQStream.Consumer do # Offset Tracking - The consumer is able to track its progress in the stream by storing its - latests offset in the stream. Check [Offset Tracking with RabbitMQ Streams(https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/) for more information on - how offset tracking works. + The consumer is able to track its progress in the stream by storing its latests offset in the stream. + Check [Offset Tracking with RabbitMQ Streams(https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/) for more information on how offset tracking works. + + The consumer can be configured to use different offset tracking strategies, which decide when to + store the offset in the stream. You can implement your own strategy by implementing the + `RabbitMQStream.Consumer.OffsetTracking` behaviour, and passing it to the `:offset_tracking` option. + It defaults to `RabbitMQStream.Consumer.OffsetTracking.CountStrategy`, which stores the offset after, + by default, every 50_000 messages. + + You can use the default strategies by passing a shorthand alias: + + * `interval` : `RabbitMQStream.Consumer.OffsetTracking.IntervalStrategy` + * `count` : `RabbitMQStream.Consumer.OffsetTracking.CountStrategy` + + use RabbitMQStream.Consumer, + connection: MyApp.MyConnection, + stream_name: "my_stream", + initial_offset: :first, + offset_tracking: [ + count: [store_after: 50], + interval: [store_after: 5_000] + ] - The consumer can be configured to use different offset tracking strategies, - which decide when to store the offset in the stream. You can implement your - own strategy by implementing the `RabbitMQStream.Consumer.OffsetTracking` - behaviour, and passing it to the `:offset_tracking` option. It defaults to - `RabbitMQStream.Consumer.OffsetTracking.CountStrategy`, which stores the - offset after, by default, every 50_000 messages. # Flow Control - The RabbitMQ Streams server requires that the consumer declares how many - messages it is able to process at a time. This is done by informing an amount - of 'credits' to the server. After every chunk is sent, one credit is consumed, - and the server will send messages only if there are credits available. + The RabbitMQ Streams server requires that the consumer declares how many messages it is able to + process at a time. This is done by informing an amount of 'credits' to the server. After every chunk + is sent, one credit is consumed, and the server will send messages only if there are credits available. + + + We can configure the consumer to automatically request more credits based on a strategy. + By default it uses the `RabbitMQStream.Consumer.FlowControl.MessageCount`, which + requests 1 additional credit for every 1 processed chunk. Please check the + RabbitMQStream.Consumer.FlowControl.MessageCount module for more information. + - We can configure the consumer to automatically request more credits based on - a strategy. By default it uses the `RabbitMQStream.Consumer.FlowControl.MessageCount`, - which requests 1 additional credit for every 1 processed chunk. Please check - the RabbitMQStream.Consumer.FlowControl.MessageCount module for more information. + You can also call `RabbitMQStream.Consumer.credit/2` to manually add more credits to the subscription, + or implement your own strategy by implementing the `RabbitMQStream.Consumer.FlowControl` behaviour, + and passing it to the `:flow_control` option. - You can also call `RabbitMQStream.Consumer.credit/2` to manually add more - credits to the subscription, or implement your own strategy by implementing - the `RabbitMQStream.Consumer.FlowControl` behaviour, and passing - it to the `:flow_control` option. You can find more information on the [RabbitMQ Streams documentation](https://www.rabbitmq.com/stream.html#flow-control). - If you want an external process to be fully in control of the flow control - of a consumer, you can set the `:flow_control` option to `false`. Then - you can call `RabbitMQStream.Consumer.credit/2` to manually add more - credits to the subscription. + + If you want an external process to be fully in control of the flow control of a consumer, you can + set the `:flow_control` option to `false`. Then you can call `RabbitMQStream.Consumer.credit/2` to + manually add more credits to the subscription. + ## Commited Offset - As of RabbitMQ 3.13, each deliver message now has the `commited_offset`. This respresents the 'next' offset, instead of the 'last'. + As of RabbitMQ 3.13, each deliver message now has the `commited_offset`. This respresents the 'next' + offset, instead of the 'last'. + - Before 3.13, using the next offset meant that the tracking was always "off by one", meaning that everytime a Consumer started, it would - it would always receive the last message it read once again. + Before 3.13, using the next offset meant that the tracking was always "off by one", meaning that + everytime a Consumer started, it would it would always receive the last message it read once again. - As of 3.13 and forward, the tracking now prefers to use the `commited_offset` when storing it to the stream, meaning that if it has received a message - and then the offset is stored, it would not receive it again. + + As of 3.13 and forward, the tracking now prefers to use the `commited_offset` when storing it to the + stream, meaning that if it has received a message and then the offset is stored, it would not receive it again. # Configuration + You can configure each consumer with: config :rabbitmq_stream, MyApp.MyConsumer, @@ -98,8 +119,9 @@ defmodule RabbitMQStream.Consumer do flow_control: [count: [credit_after: {:count, 1}]], serializer: Jason - These options are overriden by the options passed to the `use` macro, which - are overriden by the options passed to `start_link/1`. + These options are overriden by the options passed to the `use` macro, which are overriden by the + options passed to `start_link/1`. + And also you can override the defaults of all consumers with: @@ -118,6 +140,7 @@ defmodule RabbitMQStream.Consumer do # Decoding + You can declare a function for decoding each message by declaring a `decode!/1` callback as follows: defmodule MyApp.MyConsumer do use RabbitMQStream.Consumer, @@ -131,6 +154,7 @@ defmodule RabbitMQStream.Consumer do end end + Or by passing a `:serializer` option to the `use` macro: defmodule MyApp.MyConsumer do use RabbitMQStream.Consumer, @@ -140,19 +164,23 @@ defmodule RabbitMQStream.Consumer do serializer: Jason end - The default value for the `:serializer` is the module itself, unless a default is defined at a higher level of the - configuration. If there is a `decode!/1` callback defined, it is always used + + The default value for the `:serializer` is the module itself, unless a default is defined at a higher + level of the configuration. If there is a `decode!/1` callback defined, it is always used # Properties You can provide additional properties to the consumer to change its behavior, by passing `:properties`. + ## Single active consumer - To use it, you must provide a "group_name". The server manages each consumer so the only one will of each group - will be receiving chunks at a time. + To use it, you must provide a "group_name". The server manages each consumer so the only one will + of each group will be receiving chunks at a time. + - Although there is only one Consumer active, we must provide the server the offset a consumer starts on when being upgraded - to the being the active one. To do so you must implement the `handle_update/2` callback, which must return a `{:ok, offset}` tuple. + Although there is only one Consumer active, we must provide the server the offset a consumer starts + on when being upgraded to the being the active one. To do so you must implement the `handle_update/2` + callback, which must return a `{:ok, offset}` tuple. @impl true def handle_update(_, :upgrade) do