Skip to content

Commit

Permalink
fix(topology): Add topology creation for sender
Browse files Browse the repository at this point in the history
  • Loading branch information
juancgalvis committed Jun 17, 2024
1 parent 40b8497 commit fb111f2
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 16 deletions.
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.15.6-otp-26
erlang 26.1.1
elixir 1.16.2-otp-26
erlang 26.2.5
5 changes: 2 additions & 3 deletions lib/reactive_commons/api/direct_async_gateway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule DirectAsyncGateway do
@moduledoc """
This module allows the commands emission and async queries requests.
"""
@direct_exchange "directMessages"

def request_reply(%AsyncQuery{}, nil), do: raise("nil target")

Expand All @@ -12,7 +11,7 @@ defmodule DirectAsyncGateway do
msg =
OutMessage.new(
headers: headers(query, correlation_id),
exchange_name: @direct_exchange,
exchange_name: MessageContext.direct_exchange_name(),
routing_key: target_name <> ".query",
payload: Poison.encode!(query)
)
Expand Down Expand Up @@ -50,7 +49,7 @@ defmodule DirectAsyncGateway do
msg =
OutMessage.new(
headers: headers(),
exchange_name: @direct_exchange,
exchange_name: MessageContext.direct_exchange_name(),
routing_key: target_name,
payload: Poison.encode!(command)
)
Expand Down
3 changes: 1 addition & 2 deletions lib/reactive_commons/api/domain_event_bus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ defmodule DomainEventBus do
@moduledoc """
This module allows the domain events emission.
"""
@domain_events_exchange "domainEvents"

def emit(event = %DomainEvent{name: name}) do
msg =
OutMessage.new(
headers: headers(),
exchange_name: @domain_events_exchange,
exchange_name: MessageContext.events_exchange_name(),
routing_key: name,
payload: Poison.encode!(event)
)
Expand Down
6 changes: 6 additions & 0 deletions lib/reactive_commons/config/async_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ defmodule AsyncConfig do
MessageSender: SenderConn,
ListenerController: SenderConn,
},
topology: %{
command_sender: false,
queries_sender: false,
events_sender: false
},
queries_reply: true,
with_dlq_retry: false,
retry_delay: 500,
Expand All @@ -42,6 +47,7 @@ defmodule AsyncConfig do
:direct_exchange,
:reply_routing_key,
:connection_assignation,
:topology,
:queries_reply,
:connection_props,
:with_dlq_retry,
Expand Down
15 changes: 15 additions & 0 deletions lib/reactive_commons/messaging/message_sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ defmodule MessageSender do
@impl true
def handle_info({:connected, conn}, _state) do
{:ok, chan} = AMQP.Channel.open(conn)
create_topology(chan)
{:noreply, %__MODULE__{chan: chan, conn: conn}}
end

Expand Down Expand Up @@ -71,4 +72,18 @@ defmodule MessageSender do

AMQP.Basic.publish(chan, message.exchange_name, message.routing_key, message.payload, options)
end

defp create_topology(chan) do
opts = MessageContext.topology()
# Topology
if opts.command_sender || opts.queries_sender do
direct_exchange = MessageContext.direct_exchange_name()
:ok = AMQP.Exchange.declare(chan, direct_exchange, :direct, durable: true)
end

if opts.events_sender do
events_exchange_name = MessageContext.events_exchange_name()
:ok = AMQP.Exchange.declare(chan, events_exchange_name, :topic, durable: true)
end
end
end
6 changes: 6 additions & 0 deletions lib/reactive_commons/runtime/message_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ defmodule MessageContext do
MessageSender: SenderConn,
ListenerController: SenderConn
},
topology: %{
command_sender: false,
queries_sender: false,
events_sender: false
},
queries_reply: true,
with_dlq_retry: false,
retry_delay: 500,
Expand Down Expand Up @@ -96,6 +101,7 @@ defmodule MessageContext do
def max_retries, do: config().max_retries
def prefetch_count, do: config().prefetch_count
def application_name, do: config().application_name
def topology, do: config().topology

def config do
[{:conf, config}] = :ets.lookup(@table_name, :conf)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ defmodule ReactiveCommons.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:poison, "~> 5.0"},
{:poison, "~> 6.0 or ~> 5.0"},
{:amqp, "~> 3.3"},
{:uuid, "~> 1.1"},
{:telemetry, "~> 1.2"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"mock": {:hex, :mock, "0.3.8", "7046a306b71db2488ef54395eeb74df0a7f335a7caca4a3d3875d1fc81c884dd", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "7fa82364c97617d79bb7d15571193fc0c4fe5afd0c932cef09426b3ee6fe2022"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"poison": {:hex, :poison, "5.0.0", "d2b54589ab4157bbb82ec2050757779bfed724463a544b6e20d79855a9e43b24", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "11dc6117c501b80c62a7594f941d043982a1bd05a1184280c0d9166eb4d8d3fc"},
"poison": {:hex, :poison, "6.0.0", "9bbe86722355e36ffb62c51a552719534257ba53f3271dacd20fbbd6621a583a", [:mix], [{:decimal, "~> 2.1", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "bb9064632b94775a3964642d6a78281c07b7be1319e0016e1643790704e739a2"},
"rabbit_common": {:hex, :rabbit_common, "3.12.12", "b87525eb02bcd738463d9c64b00f380c30825b6dcfb93c2e1d9168cad239012c", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "027b5bf9de22d04198b1c02bc82fc01c9ed4129673218365d728db93a982f424"},
"recase": {:hex, :recase, "0.7.0", "3f2f719f0886c7a3b7fe469058ec539cb7bbe0023604ae3bce920e186305e5ae", [:mix], [], "hexpm", "36f5756a9f552f4a94b54a695870e32f4e72d5fad9c25e61bc4a3151c08a4e0c"},
"recon": {:hex, :recon, "2.5.3", "739107b9050ea683c30e96de050bc59248fd27ec147696f79a8797ff9fa17153", [:mix, :rebar3], [], "hexpm", "6c6683f46fd4a1dfd98404b9f78dcabc7fcd8826613a89dcb984727a8c3099d7"},
Expand Down
7 changes: 6 additions & 1 deletion samples/query-client/config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import Config

config :query_client,
async_config: %{
application_name: "sample-query-client"
application_name: "sample-query-client",
topology: %{
command_sender: true,
queries_sender: true,
events_sender: true
}
},
http_port: 4001
2 changes: 1 addition & 1 deletion samples/query-client/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule QueryServer.MixProject do
[
# {:dep_from_hexpm, "~> 0.3.0"},
{:reactive_commons, path: "../..", override: true},
{:plug_cowboy, "~> 2.2"},
{:plug_cowboy, "~> 2.7"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
Expand Down
10 changes: 5 additions & 5 deletions samples/query-client/mix.lock
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
%{
"amqp": {:hex, :amqp, "3.3.0", "056d9f4bac96c3ab5a904b321e70e78b91ba594766a1fc2f32afd9c016d9f43b", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "8d3ae139d2646c630d674a1b8d68c7f85134f9e8b2a1c3dd5621616994b10a8b"},
"amqp_client": {:hex, :amqp_client, "3.12.10", "dcc0d5d0037fa2b486c6eb8b52695503765b96f919e38ca864a7b300b829742d", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.10", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "16a23959899a82d9c2534ed1dcf1fa281d3b660fb7f78426b880647f0a53731f"},
"cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"},
"cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"},
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"plug": {:hex, :plug, "1.15.2", "94cf1fa375526f30ff8770837cb804798e0045fd97185f0bb9e5fcd858c792a3", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02731fa0c2dcb03d8d21a1d941bdbbe99c2946c0db098eee31008e04c6283615"},
"plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"},
"plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"},
"plug": {:hex, :plug, "1.16.0", "1d07d50cb9bb05097fdf187b31cf087c7297aafc3fed8299aac79c128a707e47", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cbf53aa1f5c4d758a7559c0bd6d59e286c2be0c6a1fac8cc3eee2f638243b93e"},
"plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
"poison": {:hex, :poison, "5.0.0", "d2b54589ab4157bbb82ec2050757779bfed724463a544b6e20d79855a9e43b24", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "11dc6117c501b80c62a7594f941d043982a1bd05a1184280c0d9166eb4d8d3fc"},
"rabbit_common": {:hex, :rabbit_common, "3.12.10", "7fc633ee206ae48783d8a5302dfc8fe1e086a5d7de494785ed206f586ad64b34", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "908a8b1bd059f5baefe225fe9d3e2545d35a28db8f6a14d60372556ca7afe641"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
Expand Down

0 comments on commit fb111f2

Please sign in to comment.