Skip to content

Commit

Permalink
Merge pull request #64 from bancolombia/feature/improved-elixir-client
Browse files Browse the repository at this point in the history
feat: elixir client improvements
  • Loading branch information
gabheadz authored Nov 27, 2023
2 parents 65181d3 + 025c70d commit 433b65e
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 72 deletions.
74 changes: 62 additions & 12 deletions clients/backend-client-elixir/lib/adf_sender_connector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,45 @@ defmodule AdfSenderConnector do

alias AdfSenderConnector.{Credentials, Router, Message}

@typedoc """
Channel sender base URL
"""
@type sender_url :: String.t()

@typedoc """
Application reference
"""
@type application_ref :: String.t()

@typedoc """
User reference
"""
@type user_ref :: String.t()

@typedoc """
Channel reference
"""
@type channel_ref :: String.t()

@typedoc """
Event name
"""
@type event_name :: String.t()

@typedoc """
Event payload as a Message struct
"""
@type message :: %Message{}


@typedoc """
Event payload as a Map
"""
@type message_data :: map()

@default_local "http://localhost:8081"


@doc """
starts the process
"""
Expand All @@ -21,7 +58,7 @@ defmodule AdfSenderConnector do

def start_link() do
HTTPoison.start
Logger.warn("No sender endpoint provided. Using default endpoint https://localhost:8081")
Logger.warning("No sender endpoint provided. Using default endpoint https://localhost:8081")
DynamicSupervisor.start_link(__MODULE__, [sender_url: @default_local], name: __MODULE__)
end

Expand Down Expand Up @@ -53,37 +90,49 @@ defmodule AdfSenderConnector do
}
end

@spec channel_registration(any, any, any) :: {:ok, any()} | {:error, any()}
@spec channel_registration(application_ref(), user_ref(), list()) :: {:ok, map()} | {:error, any()}
@doc """
Request a channel registration
"""
def channel_registration(application_ref, user_ref, options \\ []) do
new_ch = DynamicSupervisor.start_child(__MODULE__,
case find_creds_process(application_ref <> "." <> user_ref) do
:noproc ->
{:ok, pid} = start_creds_proc(application_ref, user_ref, options)
Credentials.exchange_credentials(pid)
pid ->
Credentials.exchange_credentials(pid)
end
end

defp start_creds_proc(application_ref, user_ref, options) do
DynamicSupervisor.start_child(__MODULE__,
Credentials.child_spec([
app_ref: application_ref,
user_ref: user_ref,
name: application_ref <> "." <> user_ref] ++ options))
end

case new_ch do
{:ok, pid} ->
Credentials.exchange_credentials(pid)
_ ->
new_ch
defp find_creds_process(name) do
case Registry.lookup(Registry.ADFSenderConnector, name) do
[{pid, _}] ->
pid
[] ->
:noproc
end
end


@doc """
Starts a process to deliver messages.
"""
@spec start_router_process(any, any) :: :ok | {:error, any()}
@spec start_router_process(channel_ref(), list()) :: :ok | {:error, any()}
def start_router_process(channel_ref, options \\ []) do
new_options = Keyword.delete(options, :name)
DynamicSupervisor.start_child(AdfSenderConnector, AdfSenderConnector.Router.child_spec([name: channel_ref] ++ new_options))
Logger.debug("Starting routing process: #{inspect(channel_ref)}")
DynamicSupervisor.start_child(__MODULE__, Router.child_spec([name: channel_ref] ++ new_options))
end


@spec route_message(pid(), any, any) :: :ok | {:error, any()}
@spec route_message(channel_ref(), event_name(), message() | message_data()) :: {:ok, map()} | {:error, any()}
@doc """
Request a message delivery by creating a protocol message with the data provided
"""
Expand All @@ -96,6 +145,7 @@ defmodule AdfSenderConnector do
Router.route_message(pid, event_name, message)
end
[] ->
Logger.warning(":unknown_channel_reference #{inspect(channel_ref)}")
{:error, :unknown_channel_reference}
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ defmodule AdfSenderConnector.Credentials do
"""

use AdfSenderConnector.Spec

alias AdfSenderConnector.Router

require Logger

@doc """
Expand All @@ -31,7 +30,7 @@ defmodule AdfSenderConnector.Credentials do
case response do
{:error, reason} ->
Logger.error("Error exchanging credentials, #{inspect(reason)}")
_ ->
_ ->
Logger.debug("Credentials exchanged")
response
end
Expand All @@ -50,7 +49,7 @@ defmodule AdfSenderConnector.Credentials do
HTTPoison.post(
Keyword.fetch!(state, :sender_url) <> "/ext/channel/create",
request,
[{"Content-Type", "application/json"}],
[{"content-type", "application/json"}],
parse_http_opts(state)
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule AdfSenderConnector.Message do
@type channel_ref() :: String.t()
@type message_id() :: String.t()
@type correlation_id() :: String.t()
@type message_data() :: iodata()
@type message_data() :: any()
@type event_name() :: String.t()
@type t() :: AdfSenderConnector.Message.t()

Expand Down
59 changes: 47 additions & 12 deletions clients/backend-client-elixir/lib/adf_sender_connector/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,37 @@ defmodule AdfSenderConnector.Router do
alias AdfSenderConnector.Message

@doc """
Requests Channel Sender to route a message, with the indicated event name via the channel_ref.
Requests Channel Sender to route a message, with the indicated event name.
Internally the function will build a Message.
This operation does not wait for the completion of the task.
"""
@spec route_message(pid(), event_name(), any()) :: :ok | {:error, any()}
def route_message(pid, event_name, message) when is_map(message) do
@spec cast_route_message(pid(), event_name(), any()) :: :ok | {:error, any()}
def cast_route_message(pid, event_name, message) when is_map(message) do
GenServer.cast(pid, {:route_message, event_name, message})
end

@doc """
Requests Channel Sender to route a Message.
This operation does not wait for the completion of the task.
"""
@spec cast_route_message(pid(), Message.t()) :: :ok | {:error, any()}
def cast_route_message(pid, message) when is_struct(message) do
GenServer.cast(pid, {:route_message, message})
end

@doc """
Requests Channel Sender to route a message with the indicated event name.
Internally the function will build a Message struct.
"""
def route_message(pid, event_name, message) when is_map(message) do
GenServer.call(pid, {:route_message, event_name, message})
end

@doc """
Requests Channel Sender to route a Message.
"""
@spec route_message(pid(), protocol_message()) :: :ok | {:error, any()}
def route_message(pid, protocol_message) when is_struct(protocol_message) do
GenServer.cast(pid, {:route_message, protocol_message})
def route_message(pid, message) when is_struct(message) do
GenServer.call(pid, {:route_message, message})
end

##########################
Expand All @@ -31,20 +48,38 @@ defmodule AdfSenderConnector.Router do
@doc false
def handle_cast({:route_message, event_name, message}, state) do
build_protocol_msg(Keyword.fetch!(state, :name), message, event_name)
|> build_route_request
|> do_route_msg(state)
|> decode_response
|> build_and_send(state)
{:noreply, state}
end

@doc false
def handle_cast({:route_message, protocol_message}, state) do
%{protocol_message | channel_ref: Keyword.fetch!(state, :name)}
|> build_and_send(state)
{:noreply, state}
end

def handle_call({:route_message, event_name, message}, _from, state) do
{:reply,
build_protocol_msg(Keyword.fetch!(state, :name), message, event_name)
|> build_and_send(state),
state
}
end

def handle_call({:route_message, protocol_message}, _from, state) do
{:reply,
%{protocol_message | channel_ref: Keyword.fetch!(state, :name)}
|> build_and_send(state),
state
}
end

defp build_and_send(p_message, state) do
p_message
|> build_route_request
|> do_route_msg(state)
|> decode_response

{:noreply, state}
end

defp build_protocol_msg(channel_ref, message, event_name) do
Expand All @@ -59,7 +94,7 @@ defmodule AdfSenderConnector.Router do
HTTPoison.post(
Keyword.fetch!(state, :sender_url) <> "/ext/channel/deliver_message",
request,
[{"Content-Type", "application/json"}],
[{"content-type", "application/json"}],
parse_http_opts(state)
)
end
Expand Down
21 changes: 4 additions & 17 deletions clients/backend-client-elixir/lib/adf_sender_connector/spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ defmodule AdfSenderConnector.Spec do
@type correlation_id() :: String.t()
@type event_name() :: String.t()
@type message_data() :: iodata()
@type protocol_message :: %{
channel_ref: channel_ref(),
message_id: message_id(),
correlation_id: correlation_id(),
message_data: message_data(),
event_name: event_name()
}

# inherit server
use GenServer
Expand All @@ -70,16 +63,10 @@ defmodule AdfSenderConnector.Spec do
end

def child_spec(args) do
#case NimbleOptions.validate(args, @args_definition) do
# {:ok, validated_options} ->
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
}
# {:error, reason} ->
# Logger.error("Invalid configuration provided, #{inspect(reason)}")
# raise reason
#end
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
}
end

defp via_tuple(process_alias) do
Expand Down
2 changes: 1 addition & 1 deletion clients/backend-client-elixir/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule AdfSenderConnector.MixProject do
def project do
[
app: :adf_sender_connector,
version: "0.2.1",
version: "0.2.2",
elixir: "~> 1.13",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Code.compiler_options(ignore_module_conflict: true)

defmodule AdfSenderConnector.ChannelTest do
defmodule AdfSenderConnector.CredentialsTest do
use ExUnit.Case
import Mock
alias AdfSenderConnector.Credentials
Expand All @@ -19,7 +19,7 @@ defmodule AdfSenderConnector.ChannelTest do
:ok
end

test "should start channel process" do
test "should start creds process" do

options = [http_opts: [], name: "foo"]

Expand All @@ -40,7 +40,7 @@ defmodule AdfSenderConnector.ChannelTest do

end

test "should start channel process, then should exchange credentials" do
test "should start creds process, then should exchange credentials" do

options = [http_opts: [], app_ref: "app", user_ref: "user1", name: "bar"]

Expand All @@ -63,15 +63,27 @@ defmodule AdfSenderConnector.ChannelTest do

end

# test "should handle fail to request a channel registration" do
# my_http_options = [
# timeout: 10_000, recv_timeout: 10_000, max_connections: 1000
# ]
test "should handle fail to request a channel registration" do

# {:ok, pid} = Channel.start_link([name: :demo2, sender_url: "http://localhost:8082", http_opts: my_http_options])
# response = Channel.create_channel(pid, "a", "b")
# assert {:error, :channel_sender_econnrefused} == response
# Process.exit(pid, :kill)
# end
options = [http_opts: [], app_ref: "app", user_ref: "user1", name: "foo"]

create_response = %HTTPoison.Response{
status_code: 500,
body: "{}"
}

with_mocks([
{HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, create_response} end]}
]) do

{:ok, pid} = Credentials.start_link({:sender_url, "http://localhost:8888"}, options)
assert is_pid(pid)

assert {:error, :channel_sender_unknown_error} == Credentials.exchange_credentials(pid)

Process.exit(pid, :normal)
end

end

end
Loading

0 comments on commit 433b65e

Please sign in to comment.