Skip to content

Commit

Permalink
Merge pull request #68 from bancolombia/feature/rabbit-da
Browse files Browse the repository at this point in the history
feat: added rabbitmq/secrets manager driven adapters
  • Loading branch information
gabheadz authored Dec 6, 2023
2 parents a5535f3 + e72c8d2 commit 265e2b0
Show file tree
Hide file tree
Showing 23 changed files with 1,008 additions and 1 deletion.
4 changes: 4 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/.formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
26 changes: 26 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
bridge_rabbitmq-*.tar

# Temporary files, for example, from tests.
/tmp/
16 changes: 16 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# BridgeRabbitmq

Channel Bridge RabbitMQ Adapter.

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `bridge_rabbitmq` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:bridge_rabbitmq, "~> 0.1.0"}
]
end
```
6 changes: 6 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/lib/bridge_rabbitmq.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule BridgeRabbitmq do
@moduledoc """
Documentation for `BridgeRabbitmq`.
"""

end
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule BridgeRabbitmq.Application do
use Application
require Logger

@doc false
@impl Application
def start(_type, _args) do

children = case (Application.get_env(:bridge_core, :env)) do
e when e in [:test, :bench] ->
[
{Task.Supervisor, name: BridgeRabbitmq.TaskSupervisor, options: [max_restarts: 2]}
]
_ ->
[
build_child_spec(Application.get_env(:channel_bridge, :config)),
{Task.Supervisor, name: BridgeRabbitmq.TaskSupervisor, options: [max_restarts: 2]}
]
end

Logger.info("BridgeRabbitmq.Application starting...")

opts = [strategy: :one_for_one, name: BridgeRabbitmq.Supervisor]
Supervisor.start_link(children, opts)
end

defp build_child_spec(config) do
rabbit_config = get_in(config, [:bridge, "event_bus", "rabbitmq"])
new_config = Map.put_new(rabbit_config, "broker_url", parse_connection_details(rabbit_config))
{BridgeRabbitmq.Subscriber, [new_config]}
end

defp parse_connection_details(config) do
# If a secret is specified, obtain data from Aws Secretsmanager,
# otherwise build connection string from explicit keys in file
case get_in(config, ["secret"]) do
nil ->
build_uri(config)
secret_name ->
load_credentials_secret(secret_name)
end
end

defp load_credentials_secret(secret_name) do
Logger.info("Fetching RabbitMQ credentials from Secret...")
case BridgeSecretManager.get_secret(secret_name, [output: "json"]) do
{:ok, secret_json} ->
print_secret(secret_json)
build_uri(secret_json)
{:error, reason} ->
throw(reason)
end
end

defp build_uri(data) do
username = get_in(data, ["username"])
password = case get_in(data, ["password"]) do
nil -> nil
v -> URI.encode_www_form(v)
end
virtual_host = case get_in(data, ["virtualhost"]) do
nil -> ""
"/" -> ""
value -> "/#{value}"
end
host = get_in(data, ["hostname"])

ssl = case get_in(data, ["ssl"]) do
nil -> false
true -> true
false -> false
"true" -> true
"false" -> false
end

schema = case ssl do
false -> "amqp"
true -> "amqps"
end

ssl_props = case ssl do
false -> ""
true -> "?verify=verify_none&server_name_indication=#{host}"
end

"#{schema}://#{username}:#{password}@#{host}#{virtual_host}#{ssl_props}"
end

defp print_secret(secret_value) do
masked_secret = Map.replace!(secret_value, "password", "******")
Logger.debug("Rabbitmq Secret value: #{inspect(masked_secret)}")
end


end
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
defmodule BridgeRabbitmq.MessageProcessor do
@moduledoc """
Process messages received via event bus.
"""

alias BridgeCore.CloudEvent
alias BridgeCore.CloudEvent.Parser.DefaultParser
alias BridgeCore.CloudEvent.RoutingError

require Logger

@doc """
Receives the raw mesages from the event bus and performs the processing for each.
"""
def handle_message(input_json_message) do

# ----------------------------------------------------------------
# The delivery task is done under a supervisor in order to provide
# retry functionality
Task.Supervisor.start_child(
BridgeRabbitmq.TaskSupervisor,
fn ->

send_result = input_json_message
|> convert_to_cloud_event
|> find_process_and_deliver

case send_result do
:ok ->
Logger.debug("Success: Message routing requested.")
:ok

{:error, err, _cloud_event} ->
# Logger.error("Error: Message routing failed!, reason: #{inspect(err)}")
raise RoutingError, message: err
# :error
end
end,
restart: :transient
)
# End of delivery task---------------------------------------------

end

defp convert_to_cloud_event(json_message) do
start = System.monotonic_time()

case DefaultParser.parse(json_message) do
{:ok, data_map} ->
metric([:adf, :cloudevent, :parsing, :stop], start, %{})
CloudEvent.from(data_map)

{:error, reason} ->
metric([:adf, :cloudevent, :parsing, :exception], start, %{reason: reason})
{:error, reason, json_message}
end
end

defp find_process_and_deliver({:ok, cloud_event}) do
with {:ok, channel_alias} <- CloudEvent.extract_channel_alias(cloud_event),
:ok <- BridgeCore.route_message(channel_alias, cloud_event) do
:ok
else
{:error, :noproc} ->
{:error, "Unable to find a routing process tied to channel alias", cloud_event}

{:error, reason} ->
:telemetry.execute([:adf, :channel, :alias_missing], %{time: System.monotonic_time()}, %{
reason: reason
})

{:error, "Unable to extract channel alias from message", cloud_event}
end
end

defp find_process_and_deliver({:error, _reason, _message} = result) do
result
end

defp metric(event, start_time, metadata) do
:telemetry.execute(event, %{duration: System.monotonic_time() - start_time}, metadata)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
defmodule BridgeRabbitmq.Subscriber do
@moduledoc """
Module entrypoint for messages received via RabbitMQ
"""
use Broadway

require Logger

alias BridgeRabbitmq.MessageProcessor

def start_link(opts) do
config = List.first(opts)
Logger.debug("Starting RabbitMQ Producer Module")

Broadway.start_link(__MODULE__,
name: get_registration_name(config),
producer: [
module:
{get_producer_module(config),
queue: get_in(config, ["queue"]),
connection: get_in(config, ["broker_url"]),
name: "channel-bridge-listener",
qos: [
prefetch_count: get_in(config, ["producer_prefetch"])
],
declare: [durable: true],
bindings: process_bindings(get_in(config, ["bindings"])),
on_success: :ack,
on_failure: :reject_and_requeue_once,
after_connect: &declare_rabbitmq_topology/1,
metadata: []
},
concurrency: get_in(config, ["producer_concurrency"])
# rate_limiting: [
# allowed_messages: 1,
# interval: 100
# ],
],
processors: [
default: [
concurrency: get_in(config, ["processor_concurrency"]),
max_demand: get_in(config, ["processor_max_demand"])
]
]
)
end

def stop() do
Logger.debug("Stopping RabbitMQ Producer Module")
Broadway.stop(__MODULE__, :normal)
end

defp declare_rabbitmq_topology(amqp_channel) do
# TODO parametrize exchange name
AMQP.Exchange.declare(amqp_channel, "domainEvents", :topic, durable: true)
end

@impl true
def handle_message(_, message, _context) do
message.data
|> MessageProcessor.handle_message

message
end

defp process_bindings(bindings) do
case bindings do
nil ->
[]
_ ->
Enum.map(bindings, fn e ->
{get_in(e, ["name"]), [routing_key: List.first(get_in(e, ["routing_key"]))]}
end)
end

end

defp get_registration_name(config) do
case get_in(config, ["producer_name"]) do
nil ->
__MODULE__
value ->
String.to_atom(value)
end
end

defp get_producer_module(config) do
case get_in(config, ["producer_module"]) do
nil ->
BroadwayRabbitMQ.Producer
producer ->
Module.safe_concat([producer])
end
end

end
47 changes: 47 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/mix.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule BridgeRabbitmq.MixProject do
use Mix.Project

def project do
[
app: :bridge_rabbitmq,
version: "0.1.0",
build_path: "../../_build",
config_path: "../../config/config.exs",
deps_path: "../../deps",
lockfile: "../../mix.lock",
elixir: "~> 1.15",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end

# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger], mod: {BridgeRabbitmq.Application, []}
]
end

# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:bridge_core, in_umbrella: true},
{:bridge_helper_config, in_umbrella: true},
{:bridge_secretmanager, in_umbrella: true},
{:broadway_rabbitmq, "~> 0.7.0"},
{:ex_aws, "~> 2.2"},
{:ex_aws_sts, "~> 2.2"},
{:ex_aws_secretsmanager, "~> 2.0"},
{:configparser_ex, "~> 4.0"},
{:sweet_xml, "~> 0.6"},
{:vapor, "~> 0.10.0"},
# testing dependencies
{:mock, "~> 0.3.0", only: :test},
{:credo, "~> 1.6", only: [:dev, :test], runtime: false},
{:credo_sonarqube, "~> 0.1.3", only: [:dev, :test]},
{:sobelow, "~> 0.8", only: :dev},
{:excoveralls, "~> 0.10", only: :test},
{:ex_unit_sonarqube, "~> 0.1", only: :test}
]
end
end
Loading

0 comments on commit 265e2b0

Please sign in to comment.