-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance Optimizations #9
Conversation
Side question: How do I decode what's in the chunk's entries? These messages are still encoded with headers, etc, right? Do you know how to deserialize them? Edit: I found this: Mix.install([:amqp10_common])
amsg = :amqp10_binary_parser.parse_all(msg) |
Thank you very much for the contribution. I'll work on adding a test that covers this case.
I'm not sure how the RabbitMQ AMQP stores the messages when interoping with Streams. The specification only states that each entry has the binary content of each message. You should also be able to add a custom decoder for each message in the stream ast the following: defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first
@impl true
def handle_chunk(%RabbitMQStream.OsirisChunk{} = _chunk, _consumer) do
:ok
end
def decode!(message) do
Jason.decode!(message)
end
end Or by passing a Since the |
And again. Thank you very much for the contribution. ❤️ |
From https://rabbitmq.com/streams.html#limitations-message-encoding:
|
Yep, that's it. But only for when interoping between queues/exchanges and a Stream. When producing and consuming using the Streams Protocol, the messages are stored as the raw binary passed by the user. But it makes a lot of sense to add this information to the documentation. |
Still playing around. iex> msg = <<0, 83, 114, 193, 72, 6, 163, 13, 120, 45, 114, 111, 117, 116, 105, 110, 103, 45, 107, 101,
121, 161, 0, 163, 10, 120, 45, 101, 120, 99, 104, 97, 110, 103, 101, 161, 15, 116, 101, 115,
116, 45, 115, 116, 114, 101, 97, 109, 45, 120, 99, 104, 163, 21, 120, 45, 98, 97, 115, 105,
99, 45, 100, 101, 108, 105, 118, 101, 114, 121, 45, 109, 111, 100, 101, 80, 2, 0, 83, 117,
160, 4, 97, 115, 100, 102>>
...> :amqp10_framing.decode_bin(msg)
[
"v1_0.message_annotations": [
{{:symbol, "x-routing-key"}, {:utf8, ""}},
{{:symbol, "x-exchange"}, {:utf8, "test-stream-xch"}},
{{:symbol, "x-basic-delivery-mode"}, {:ubyte, 2}}
],
"v1_0.data": "asdf"
] |
Yes! Nice! Something like this should probably be the default impl. of |
I don't believe so, as a user might interact with the Streams by only using the Stream Protocol itself, which doesn't use the 'AMP1.0' binary format, and would make the default implementation break. |
oh that's possible? I'm still learning about these streams. Until now I had the impression you'd always send your message via exchange => stream => consumer |
The producer implemented by this client is able to send the messages directly using the Streams Protocol itself, directly to each Stream. I recommend the following videos that give a great introduction on the topic: Introducing Stream Support in RabbitMQ |
Okay. I think I get it. But please correct me if I'm wrong...
edit: I've just verified myself that this is true. If I use the lib to publish a message, the entry is "just the message". Not amqp 1.0 encoded |
Yep that's right. You can check the docs on each module, and also at the guides folder. They are not yet published to HexDocs since I'm still improving them. |
Hey, I've stumbled upon this library while playing with streams. I have 2 proposals for performance optimisations. I understand these chunks can contain thousands of messages. And appending an element to a list is a costly operation the longer the list gets (
O(n)
). This PR tries to optimise that.NOTE: My first commit is tested by the test suite. However, there are not tests for the code I changed with my second commit. This needs to be verified!