Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Main client module, with additional features #17

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ jobs:

- uses: actions/checkout@v3

- uses: isbang/compose-action@v1.5.1
- uses: hoverkraft-tech/compose-action@2.0.1
with:
compose-file: "./services/docker-compose.yaml"
services: "rabbitmq_stream_${{ matrix.version }}"
- name: Wait RabbitMQ is Up

- name: Wait until RabbitMQ is Up
run: sleep 10s
shell: bash

- name: Create 'invoices' SuperStream
run: docker exec rabbitmq_stream rabbitmq-streams add_super_stream invoices --partitions 3

Expand All @@ -79,4 +79,44 @@ jobs:
run: mix compile

- name: Run tests
run: mix test --exclude test --include v${{ matrix.version }}
run: mix test --exclude test --include v${{ matrix.version }}
test-clustered:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
version: ["3_13"]
elixir: ["1.15", "1.16"]
otp: ["26"]
steps:
- uses: erlef/setup-beam@v1
with:
otp-version: ${{ matrix.otp }}
elixir-version: ${{ matrix.elixir }}

- uses: actions/checkout@v3

- uses: hoverkraft-tech/[email protected]
with:
compose-file: "./services/cluster/docker-compose.yaml"

- name: Wait util RabbitMQ is Up
run: sleep 20s
shell: bash

- name: Create 'invoices' SuperStream
run: docker exec rabbitmq1 rabbitmq-streams add_super_stream invoices --partitions 3

- name: Install Dependencies
run: |
mix local.rebar --force
mix local.hex --force
mix deps.unlock --all
mix deps.get
mix deps.compile

- name: Compile app
run: mix compile

- name: Run tests
run: mix test --exclude test --include v${{ matrix.version }}
21 changes: 16 additions & 5 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,24 @@ Make your change. Add tests for your change. Make the tests pass:
To run the Clustered test, run:

# Start the cluster
docker compose --project-directory services/cluster up -d
docker-compose --project-directory services/cluster up -d
# or for proxied
docker compose --project-directory services/proxied-cluster up -d
# docker-compose --project-directory services/proxied-cluster up -d

docker run --network cluster_rabbitmq -v "${PWD}:/home/rabbitmq-stream" --rm -it --entrypoint bash elixir:1.17.2-otp-26

cd /home/rabbitmq-stream

mix local.rebar --force
mix local.hex --force
mix deps.unlock --all
mix deps.get
mix deps.compile
mix compile

# Run the tests
mix test --exclude test --include v3_13_proxied_cluster
mix test --exclude test --include v3_13_cluster

docker run --network proxied-cluster_rabbitmq -v "${PWD}:/home/rabbitmq-stream" -P elixir:1.17.2-otp-26 /bin/sh -c 'cd /home/rabbitmq-stream; ./services/cluster/test.sh'
<!-- docker run --network proxied-cluster_rabbitmq -v "${PWD}:/home/rabbitmq-stream" -P elixir:1.17.2-otp-26 /bin/sh -c 'cd /home/rabbitmq-stream; ./services/cluster/test.sh' -->

<!-- docker run --network proxied-cluster_rabbitmq -v "${PWD}:/home/rabbitmq-stream" --rm -it --entrypoint bash elixir:1.17.2-otp-26-->
<!-- docker run --network cluster_rabbitmq -v "${PWD}:/home/rabbitmq-stream" --rm -it --entrypoint bash elixir:1.17.2-otp-26-->
88 changes: 88 additions & 0 deletions lib/client/client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
defmodule RabbitMQStream.Client do
@moduledoc """
Client for connection to a RabbitMQ Stream Cluster.

This module provides the API for setting up and managing a connection to multiple RabbitMQ Stream
nodes in a cluster. It implements RabbitMQStream.Connection.Behavior, but with added functionality
related to managing multiple connections.


## Lifecycle

At startup, the client creates a`RabbitMQStream.Connection` to the provided host. It is used to
discover other nodes, mainly using the `query_metadata` command.


"""

defmacro __using__(opts) do
quote location: :keep do
@opts unquote(opts)

def child_spec(opts) do
%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
end

def start_link(opts \\ []) when is_list(opts) do
opts =
Application.get_env(:rabbitmq_stream, __MODULE__, [])
|> Keyword.merge(@opts)
|> Keyword.merge(opts)
|> Keyword.put(:name, __MODULE__)

RabbitMQStream.Client.start_link(opts)
end

def stop(reason \\ :normal, timeout \\ :infinity) do
GenServer.stop(__MODULE__, reason, timeout)
end
end
end

def start_link(opts \\ []) when is_list(opts) do
opts =
Application.get_env(:rabbitmq_stream, :defaults, [])
|> Keyword.get(:client, [])
|> Keyword.merge(opts)

GenServer.start_link(RabbitMQStream.Client.Lifecycle, opts, name: opts[:name])
end

def child_spec(opts) do
%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
end

defdelegate subscribe(server, stream_name, pid, offset, credit, properties \\ []), to: RabbitMQStream.Connection

@type client_option ::
{:auto_discovery, boolean()}
| {:max_retries, non_neg_integer()}
| {:proxied?, boolean()}

defstruct [
:max_retries,
status: :setup,
opts: [],
control: nil,
proxied?: false,
monitors: %{},
clients: %{},
client_sequence: 0
]

@type t :: %__MODULE__{
control: pid() | nil,
status: :open | :setup | :closed,
client_sequence: non_neg_integer(),
proxied?: boolean(),
# Maps each subscriber to the connection's PID, so that we can garbage collect it when the subscriber dies
monitors: %{
reference() => {type :: :brooker | :subscriber | :producer, other :: reference(), id :: non_neg_integer()}
},
clients: %{
reference() => {type :: :subscriber | :producer, subscriber :: pid(), connection :: pid(), args :: term()}
},
max_retries: non_neg_integer() | nil,
opts: [RabbitMQStream.Connection.connection_options() | client_option()]
}
end
Loading
Loading