Skip to content

Commit

Permalink
Merge pull request #7 from VictorGaiva/feat/3-13-support
Browse files Browse the repository at this point in the history
RabbitMQ 3.11 Support
  • Loading branch information
VictorGaiva authored Jan 19, 2024
2 parents 2dbbaee + d5fcae2 commit d952a44
Show file tree
Hide file tree
Showing 66 changed files with 4,867 additions and 2,663 deletions.
28 changes: 18 additions & 10 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@ on:
- "mix.exs"
- "mix.lock"
- "test/**"
- ".github/workflows/ci.yaml"
push:
branches:
- main
paths:
- "lib/**"
- "mix.exs"
- "mix.lock"
- ".github/workflows/ci.yaml"

jobs:
test:
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3.11
ports:
- 5552:5552
strategy:
fail-fast: false
matrix:
version: ["3_13", "3_12", "3_11"]
steps:
- uses: erlef/setup-beam@v1
with:
Expand All @@ -33,10 +34,17 @@ jobs:

- uses: actions/checkout@v3

- name: Enable rabbitmq management plugin
run: |
DOCKER_NAME=$(docker ps --filter ancestor=rabbitmq:3.11 --format "{{.Names}}")
docker exec $DOCKER_NAME rabbitmq-plugins enable rabbitmq_stream
- uses: isbang/[email protected]
with:
compose-file: "./services/docker-compose.yaml"
services: "rabbitmq_stream_${{ matrix.version }}"

- name: Wait RabbitMQ is Up
run: sleep 10s
shell: bash

- name: Create 'invoices' SuperStream
run: docker exec rabbitmq_stream rabbitmq-streams add_super_stream invoices --partitions 3

- name: Install Dependencies
run: |
Expand All @@ -50,4 +58,4 @@ jobs:
run: mix compile

- name: Run tests
run: mix test
run: mix test --exclude test --include v${{ matrix.version }}
62 changes: 44 additions & 18 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,48 +1,74 @@
# Changelog

## 0.1.0
## 0.4.0

Initial release with the following features:
Added support for RabbitMQ 3.13, with Route, Partitions and SuperStreams support.

- Opening connection to RabbitMQ server
- Declaring a Stream
- Creating a Stream Publisher
- Subscribing to Stream Messages
- Initial Hex Release
### 0.4.0 Features

## 0.2.0
- Support for `:consumerupdate`, `:exchangecommandversions`, `:streamstats`, commands.
- Serialization options for encoding and decoding messages.
- TLS Support
- Functional `single-active-consumer`.
- Initial support for `filter_value` consumer parameter, and `:createsuperstream`, `:deletesuperstream`, `:route`, `:partitions` commands.
- Initial support for SuperStreams, with RabbitMQStream.SuperConsumer and RabbitMQStream.SuperPublisher.

The main objective of this release is to remove the manually added code from `rabbitmq_stream_common`'s Erlang implementation of Encoding and Decoding logic, with frame buffering.
### 0.4.0 Changes

## 0.2.1
The 'Message' module tree was refactored to make all the Encoding and Decoding logic stay close to each other.

Documentation and Configuration refactoring
- Improved the cleanup logic for closing the connection.
- Publishers and Consumers now expects any name of a GenServer process, instead of a Module.
- Added checks on supported commands based on Server version, and exchanged commands versions.

- It is now possible to define the connection and subscriber parameters throught the `config.exs` file
- Documentation improvements, and examples
### 0.4.0 Breaking Changes

- Renamed `RabbitMQStream.Subscriber` to `RabbitMQStream.Consumer`
- Renamed `RabbitMQStream.Publisher` to `RabbitMQStream.Producer`

## 0.3.0

Added an implementation for a stream Subscriber, fixed bugs and improved the documentation.
Added an implementation for a stream Consumer, fixed bugs and improved the documentation.

### Features
### 0.3.0 Features

- Added the `:credit` command.
- Added `RabbitMQStream.Subscriber`, which subscribes to a stream, while tracking its offset and credit based on customizeable strategies.
- Added the possibility of globally configuring the default Connection for Publishers and Subscribers

### Bug Fixes
### 0.3.0 Bug Fixes

- Fixed an issue where tcp packages with multiple commands where not being correctly parsed, and in reversed order

### Changes
### 0.3.0 Changes

- `RabbitMQStream.Publisher` no longer calls `connect` on the Connection during its setup.
- Moved `RabbitMQStream.Publisher`'s setup logic into `handle_continue`, to prevent locking up the application startup.
- `RabbitMQStream.Publisher` no longer declares the stream if it doesn't exists.
- `RabbitMQStream.Publisher` module now can optionally declare a `before_start/2` callback, which is called before it calls `declare_publisher/2`, and can be used to create the stream if it doesn't exists.
- `RabbitMQStream.Connection` now buffers the requests while the connection is not yet `:open`.

### Breaking Changes
### 0.3.0 Breaking Changes

- Subscription deliver messages are now in the format `{:chunk, %RabbitMQ.OsirisChunk{}}`.

## 0.2.1

Documentation and Configuration refactoring

- It is now possible to define the connection and subscriber parameters throught the `config.exs` file
- Documentation improvements, and examples

## 0.2.0

The main objective of this release is to remove the manually added code from `rabbitmq_stream_common`'s Erlang implementation of Encoding and Decoding logic, with frame buffering.

## 0.1.0

Initial release with the following features:

- Opening connection to RabbitMQ server
- Declaring a Stream
- Creating a Stream Publisher
- Subscribing to Stream Messages
- Initial Hex Release
48 changes: 36 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Zero dependencies Elixir Client for [RabbitMQ Streams Protocol](https://www.rabb

## Usage

### Subscribing to stream
### Consuming from stream

First you define a connection

Expand All @@ -20,23 +20,23 @@ defmodule MyApp.MyConnection do
end
```

You then can declare a subscriber module with the `RabbitMQStream.Subscriber`:
You then can declare a consumer module with the `RabbitMQStream.Consumer`:

```elixir
defmodule MyApp.MySubscriber do
use RabbitMQStream.Subscriber,
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first

@impl true
def handle_chunk(%RabbitMQStream.OsirisChunk{}=_chunk, _subscriber) do
def handle_chunk(%RabbitMQStream.OsirisChunk{}=_chunk, _consumer) do
:ok
end
end
```

Or you could manually subscribe to the stream with
Or you could manually consume from the stream with

```elixir
{:ok, _subscription_id} = MyApp.MyConnection.subscribe("stream-01", self(), :next, 999)
Expand All @@ -51,17 +51,17 @@ def handle_info({:chunk, %RabbitMQStream.OsirisChunk{} = chunk}, state) do
end
```

You can take a look at an example Subscriber GenServer at the [Subscribing Documentation](guides/tutorial/subscribing.md).
You can take a look at an example Consumer GenServer at the [Consuming Documentation](guides/tutorial/consuming.md).

### Publishing to stream

RabbitMQ Streams protocol needs a static `:reference_name` per publisher. This is used to prevent message duplication. For this reason, each stream needs, for now, a static module to publish messages, which keeps track of its own `publishing_id`.
RabbitMQ Streams protocol needs a static `:reference_name` per producer. This is used to prevent message duplication. For this reason, each stream needs, for now, a static module to publish messages, which keeps track of its own `publishing_id`.

You can define a `Publisher` module like this:
You can define a `Producer` module like this:

```elixir
defmodule MyApp.MyPublisher do
use RabbitMQStream.Publisher,
defmodule MyApp.MyProducer do
use RabbitMQStream.Producer,
stream: "stream-01",
connection: MyApp.MyConnection
end
Expand All @@ -70,7 +70,7 @@ end
Then you can publish messages to the stream:

```elixir
MyApp.MyPublisher.publish("Hello World")
MyApp.MyProducer.publish("Hello World")
```

## Installation
Expand Down Expand Up @@ -99,4 +99,28 @@ end

```

You can configure a default Serializer module by passing it to the defaults configuration option

```elixir
config :rabbitmq_stream, :defaults,
serializer: Jason
end
```

## TLS Support

You can configure the RabbitmqStream to use TLS connections:

```elixir
coonfig :rabbitmq_stream, :defaults,
connection: [
transport: :ssl,
ssl_opts: [
keyfile: "services/cert/client_box_key.pem",
certfile: "services/cert/client_box_certificate.pem",
cacertfile: "services/cert/ca_certificate.pem"
]
]
```

For more information, check the [documentation](https://hexdocs.pm/rabbitmq_stream/).
5 changes: 5 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@ import Config

# Prevents the CI from being spammed with logs
config :logger, :level, :info

config :rabbitmq_stream, :defaults,
connection: [
port: 5553
]
64 changes: 32 additions & 32 deletions docs/support-table.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
# Support Table

| Command | Supported? |
|-------------------------|------------|
| declarepublisher | ✔️ |
| publish | ✔️ |
| publishconfirm | ✔️ |
| publisherror | ✔️ |
| querypublishersequence | ✔️ |
| deletepublisher | ✔️ |
| subscribe | ✔️ |
| deliver | ✔️ |
| credit | ✔️ |
| storeoffset | ✔️ |
| queryoffset | ✔️ |
| unsubscribe | ✔️ |
| create | ✔️ |
| delete | ✔️ |
| metadata | ✔️ |
| metadataupdate | ✔️ |
| peerproperties | ✔️ |
| saslhandshake | ✔️ |
| saslauthenticate | ✔️ |
| tune | ✔️ |
| open | ✔️ |
| close | ✔️ |
| heartbeat | ✔️ |
| route | |
| partitions | |
| consumerupdate | |
| exchangecommandversions | |
| streamstats | |
| createsuperstream | |
| deletesuperstream | |
| Command | Supported? | Minimal version |
|-------------------------|------------|-----------------|
| declarepublisher | ✔️ | 3.9 |
| publish | ✔️ | 3.9 |
| publishconfirm | ✔️ | 3.9 |
| publisherror | ✔️ | 3.9 |
| querypublishersequence | ✔️ | 3.9 |
| deletepublisher | ✔️ | 3.9 |
| subscribe | ✔️ | 3.9 |
| deliver | ✔️ | 3.9 |
| credit | ✔️ | 3.9 |
| storeoffset | ✔️ | 3.9 |
| queryoffset | ✔️ | 3.9 |
| unsubscribe | ✔️ | 3.9 |
| create | ✔️ | 3.9 |
| delete | ✔️ | 3.9 |
| metadata | ✔️ | 3.9 |
| metadataupdate | ✔️ | 3.9 |
| peerproperties | ✔️ | 3.9 |
| saslhandshake | ✔️ | 3.9 |
| saslauthenticate | ✔️ | 3.9 |
| tune | ✔️ | 3.9 |
| open | ✔️ | 3.9 |
| close | ✔️ | 3.9 |
| heartbeat | ✔️ | 3.9 |
| consumerupdate | ✔️ | 3.11 |
| streamstats | ✔️ | 3.11 |
| exchangecommandversions | ✔️ | 3.13 |
| createsuperstream | ✔️ | 3.13 |
| deletesuperstream | ✔️ | 3.13 |
| route | ✔️ | 3.13 |
| partitions | ✔️ | 3.13 |
Loading

0 comments on commit d952a44

Please sign in to comment.