Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added rabbitmq/secrets manager driven adapters #68

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/.formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
26 changes: 26 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
bridge_rabbitmq-*.tar

# Temporary files, for example, from tests.
/tmp/
16 changes: 16 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# BridgeRabbitmq

Channel Bridge RabbitMQ Adapter.

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `bridge_rabbitmq` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:bridge_rabbitmq, "~> 0.1.0"}
]
end
```
6 changes: 6 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/lib/bridge_rabbitmq.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule BridgeRabbitmq do
@moduledoc """
Documentation for `BridgeRabbitmq`.
"""

end
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule BridgeRabbitmq.Application do
use Application
require Logger

@doc false
@impl Application
def start(_type, _args) do

children = case (Application.get_env(:bridge_core, :env)) do
e when e in [:test, :bench] ->
[
{Task.Supervisor, name: BridgeRabbitmq.TaskSupervisor, options: [max_restarts: 2]}
]
_ ->
[
build_child_spec(Application.get_env(:channel_bridge, :config)),
{Task.Supervisor, name: BridgeRabbitmq.TaskSupervisor, options: [max_restarts: 2]}
]
end

Logger.info("BridgeRabbitmq.Application starting...")

opts = [strategy: :one_for_one, name: BridgeRabbitmq.Supervisor]
Supervisor.start_link(children, opts)
end

defp build_child_spec(config) do
rabbit_config = get_in(config, [:bridge, "event_bus", "rabbitmq"])
new_config = Map.put_new(rabbit_config, "broker_url", parse_connection_details(rabbit_config))
{BridgeRabbitmq.Subscriber, [new_config]}
end

defp parse_connection_details(config) do
# If a secret is specified, obtain data from Aws Secretsmanager,
# otherwise build connection string from explicit keys in file
case get_in(config, ["secret"]) do
nil ->
build_uri(config)
secret_name ->
load_credentials_secret(secret_name)
end
end

defp load_credentials_secret(secret_name) do
Logger.info("Fetching RabbitMQ credentials from Secret...")
case BridgeSecretManager.get_secret(secret_name, [output: "json"]) do
{:ok, secret_json} ->
print_secret(secret_json)
build_uri(secret_json)
{:error, reason} ->
throw(reason)
end
end

defp build_uri(data) do
username = get_in(data, ["username"])
password = case get_in(data, ["password"]) do
nil -> nil
v -> URI.encode_www_form(v)
end
virtual_host = case get_in(data, ["virtualhost"]) do
nil -> ""
"/" -> ""
value -> "/#{value}"
end
host = get_in(data, ["hostname"])

ssl = case get_in(data, ["ssl"]) do
nil -> false
true -> true
false -> false
"true" -> true
"false" -> false
end

schema = case ssl do
false -> "amqp"
true -> "amqps"
end

ssl_props = case ssl do
false -> ""
true -> "?verify=verify_none&server_name_indication=#{host}"
end

"#{schema}://#{username}:#{password}@#{host}#{virtual_host}#{ssl_props}"
end

defp print_secret(secret_value) do
masked_secret = Map.replace!(secret_value, "password", "******")
Logger.debug("Rabbitmq Secret value: #{inspect(masked_secret)}")
end


end
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
defmodule BridgeRabbitmq.MessageProcessor do
@moduledoc """
Process messages received via event bus.

"""

alias BridgeCore.CloudEvent
alias BridgeCore.CloudEvent.Parser.DefaultParser
alias BridgeCore.CloudEvent.RoutingError

require Logger

@doc """
Receives the raw mesages from the event bus and performs the processing for each.
"""
def handle_message(input_json_message) do

# ----------------------------------------------------------------
# The delivery task is done under a supervisor in order to provide
# retry functionality
Task.Supervisor.start_child(
BridgeRabbitmq.TaskSupervisor,
fn ->

send_result = input_json_message
|> convert_to_cloud_event
|> find_process_and_deliver

case send_result do
:ok ->
Logger.debug("Success: Message routing requested.")
:ok

{:error, err, _cloud_event} ->
# Logger.error("Error: Message routing failed!, reason: #{inspect(err)}")
raise RoutingError, message: err
# :error
end
end,
restart: :transient
)
# End of delivery task---------------------------------------------

end

defp convert_to_cloud_event(json_message) do
start = System.monotonic_time()

case DefaultParser.parse(json_message) do
{:ok, data_map} ->
metric([:adf, :cloudevent, :parsing, :stop], start, %{})
CloudEvent.from(data_map)

{:error, reason} ->
metric([:adf, :cloudevent, :parsing, :exception], start, %{reason: reason})
{:error, reason, json_message}
end
end

defp find_process_and_deliver({:ok, cloud_event}) do
with {:ok, channel_alias} <- CloudEvent.extract_channel_alias(cloud_event),
:ok <- BridgeCore.route_message(channel_alias, cloud_event) do
:ok
else
{:error, :noproc} ->
{:error, "Unable to find a routing process tied to channel alias", cloud_event}

{:error, reason} ->
:telemetry.execute([:adf, :channel, :alias_missing], %{time: System.monotonic_time()}, %{
reason: reason
})

{:error, "Unable to extract channel alias from message", cloud_event}
end
end

defp find_process_and_deliver({:error, _reason, _message} = result) do
result
end

defp metric(event, start_time, metadata) do
:telemetry.execute(event, %{duration: System.monotonic_time() - start_time}, metadata)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
defmodule BridgeRabbitmq.Subscriber do
@moduledoc """
Module entrypoint for messages received via RabbitMQ
"""
use Broadway

require Logger

alias BridgeRabbitmq.MessageProcessor

def start_link(opts) do
config = List.first(opts)
Logger.debug("Starting RabbitMQ Producer Module")

Broadway.start_link(__MODULE__,
name: get_registration_name(config),
producer: [
module:
{get_producer_module(config),
queue: get_in(config, ["queue"]),
connection: get_in(config, ["broker_url"]),
name: "channel-bridge-listener",
qos: [
prefetch_count: get_in(config, ["producer_prefetch"])
],
declare: [durable: true],
bindings: process_bindings(get_in(config, ["bindings"])),
on_success: :ack,
on_failure: :reject_and_requeue_once,
after_connect: &declare_rabbitmq_topology/1,
metadata: []
},
concurrency: get_in(config, ["producer_concurrency"])
# rate_limiting: [
# allowed_messages: 1,
# interval: 100
# ],
],
processors: [
default: [
concurrency: get_in(config, ["processor_concurrency"]),
max_demand: get_in(config, ["processor_max_demand"])
]
]
)
end

def stop() do
Logger.debug("Stopping RabbitMQ Producer Module")
Broadway.stop(__MODULE__, :normal)
end

defp declare_rabbitmq_topology(amqp_channel) do
# TODO parametrize exchange name
AMQP.Exchange.declare(amqp_channel, "domainEvents", :topic, durable: true)
end

@impl true
def handle_message(_, message, _context) do
message.data
|> MessageProcessor.handle_message

message
end

defp process_bindings(bindings) do
case bindings do
nil ->
[]
_ ->
Enum.map(bindings, fn e ->
{get_in(e, ["name"]), [routing_key: List.first(get_in(e, ["routing_key"]))]}
end)
end

end

defp get_registration_name(config) do
case get_in(config, ["producer_name"]) do
nil ->
__MODULE__
value ->
String.to_atom(value)
end
end

defp get_producer_module(config) do
case get_in(config, ["producer_module"]) do
nil ->
BroadwayRabbitMQ.Producer
producer ->
Module.safe_concat([producer])
end
end

end
47 changes: 47 additions & 0 deletions channel-bridge/apps/bridge_rabbitmq/mix.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule BridgeRabbitmq.MixProject do
use Mix.Project

def project do
[
app: :bridge_rabbitmq,
version: "0.1.0",
build_path: "../../_build",
config_path: "../../config/config.exs",
deps_path: "../../deps",
lockfile: "../../mix.lock",
elixir: "~> 1.15",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end

# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger], mod: {BridgeRabbitmq.Application, []}
]
end

# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:bridge_core, in_umbrella: true},
{:bridge_helper_config, in_umbrella: true},
{:bridge_secretmanager, in_umbrella: true},
{:broadway_rabbitmq, "~> 0.7.0"},
{:ex_aws, "~> 2.2"},
{:ex_aws_sts, "~> 2.2"},
{:ex_aws_secretsmanager, "~> 2.0"},
{:configparser_ex, "~> 4.0"},
{:sweet_xml, "~> 0.6"},
{:vapor, "~> 0.10.0"},
# testing dependencies
{:mock, "~> 0.3.0", only: :test},
{:credo, "~> 1.6", only: [:dev, :test], runtime: false},
{:credo_sonarqube, "~> 0.1.3", only: [:dev, :test]},
{:sobelow, "~> 0.8", only: :dev},
{:excoveralls, "~> 0.10", only: :test},
{:ex_unit_sonarqube, "~> 0.1", only: :test}
]
end
end
Loading
Loading