Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: elixir client improvements #64

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 62 additions & 12 deletions clients/backend-client-elixir/lib/adf_sender_connector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,45 @@ defmodule AdfSenderConnector do

alias AdfSenderConnector.{Credentials, Router, Message}

@typedoc """
Channel sender base URL
"""
@type sender_url :: String.t()

@typedoc """
Application reference
"""
@type application_ref :: String.t()

@typedoc """
User reference
"""
@type user_ref :: String.t()

@typedoc """
Channel reference
"""
@type channel_ref :: String.t()

@typedoc """
Event name
"""
@type event_name :: String.t()

@typedoc """
Event payload as a Message struct
"""
@type message :: %Message{}


@typedoc """
Event payload as a Map
"""
@type message_data :: map()

@default_local "http://localhost:8081"


@doc """
starts the process
"""
Expand All @@ -21,7 +58,7 @@ defmodule AdfSenderConnector do

def start_link() do
HTTPoison.start
Logger.warn("No sender endpoint provided. Using default endpoint https://localhost:8081")
Logger.warning("No sender endpoint provided. Using default endpoint https://localhost:8081")
DynamicSupervisor.start_link(__MODULE__, [sender_url: @default_local], name: __MODULE__)
end

Expand Down Expand Up @@ -53,37 +90,49 @@ defmodule AdfSenderConnector do
}
end

@spec channel_registration(any, any, any) :: {:ok, any()} | {:error, any()}
@spec channel_registration(application_ref(), user_ref(), list()) :: {:ok, map()} | {:error, any()}
@doc """
Request a channel registration
"""
def channel_registration(application_ref, user_ref, options \\ []) do
new_ch = DynamicSupervisor.start_child(__MODULE__,
case find_creds_process(application_ref <> "." <> user_ref) do
:noproc ->
{:ok, pid} = start_creds_proc(application_ref, user_ref, options)
Credentials.exchange_credentials(pid)
pid ->
Credentials.exchange_credentials(pid)
end
end

defp start_creds_proc(application_ref, user_ref, options) do
DynamicSupervisor.start_child(__MODULE__,
Credentials.child_spec([
app_ref: application_ref,
user_ref: user_ref,
name: application_ref <> "." <> user_ref] ++ options))
end

case new_ch do
{:ok, pid} ->
Credentials.exchange_credentials(pid)
_ ->
new_ch
defp find_creds_process(name) do
case Registry.lookup(Registry.ADFSenderConnector, name) do
[{pid, _}] ->
pid
[] ->
:noproc
end
end


@doc """
Starts a process to deliver messages.
"""
@spec start_router_process(any, any) :: :ok | {:error, any()}
@spec start_router_process(channel_ref(), list()) :: :ok | {:error, any()}
def start_router_process(channel_ref, options \\ []) do
new_options = Keyword.delete(options, :name)
DynamicSupervisor.start_child(AdfSenderConnector, AdfSenderConnector.Router.child_spec([name: channel_ref] ++ new_options))
Logger.debug("Starting routing process: #{inspect(channel_ref)}")
DynamicSupervisor.start_child(__MODULE__, Router.child_spec([name: channel_ref] ++ new_options))
end


@spec route_message(pid(), any, any) :: :ok | {:error, any()}
@spec route_message(channel_ref(), event_name(), message() | message_data()) :: {:ok, map()} | {:error, any()}
@doc """
Request a message delivery by creating a protocol message with the data provided
"""
Expand All @@ -96,6 +145,7 @@ defmodule AdfSenderConnector do
Router.route_message(pid, event_name, message)
end
[] ->
Logger.warning(":unknown_channel_reference #{inspect(channel_ref)}")
{:error, :unknown_channel_reference}
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ defmodule AdfSenderConnector.Credentials do
"""

use AdfSenderConnector.Spec

alias AdfSenderConnector.Router

require Logger

@doc """
Expand All @@ -31,7 +30,7 @@ defmodule AdfSenderConnector.Credentials do
case response do
{:error, reason} ->
Logger.error("Error exchanging credentials, #{inspect(reason)}")
_ ->
_ ->
Logger.debug("Credentials exchanged")
response
end
Expand All @@ -50,7 +49,7 @@ defmodule AdfSenderConnector.Credentials do
HTTPoison.post(
Keyword.fetch!(state, :sender_url) <> "/ext/channel/create",
request,
[{"Content-Type", "application/json"}],
[{"content-type", "application/json"}],
parse_http_opts(state)
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule AdfSenderConnector.Message do
@type channel_ref() :: String.t()
@type message_id() :: String.t()
@type correlation_id() :: String.t()
@type message_data() :: iodata()
@type message_data() :: any()
@type event_name() :: String.t()
@type t() :: AdfSenderConnector.Message.t()

Expand Down
59 changes: 47 additions & 12 deletions clients/backend-client-elixir/lib/adf_sender_connector/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,37 @@ defmodule AdfSenderConnector.Router do
alias AdfSenderConnector.Message

@doc """
Requests Channel Sender to route a message, with the indicated event name via the channel_ref.
Requests Channel Sender to route a message, with the indicated event name.
Internally the function will build a Message.
This operation does not wait for the completion of the task.
"""
@spec route_message(pid(), event_name(), any()) :: :ok | {:error, any()}
def route_message(pid, event_name, message) when is_map(message) do
@spec cast_route_message(pid(), event_name(), any()) :: :ok | {:error, any()}
def cast_route_message(pid, event_name, message) when is_map(message) do
GenServer.cast(pid, {:route_message, event_name, message})
end

@doc """
Requests Channel Sender to route a Message.
This operation does not wait for the completion of the task.
"""
@spec cast_route_message(pid(), Message.t()) :: :ok | {:error, any()}
def cast_route_message(pid, message) when is_struct(message) do
GenServer.cast(pid, {:route_message, message})
end

@doc """
Requests Channel Sender to route a message with the indicated event name.
Internally the function will build a Message struct.
"""
def route_message(pid, event_name, message) when is_map(message) do
GenServer.call(pid, {:route_message, event_name, message})
end

@doc """
Requests Channel Sender to route a Message.
"""
@spec route_message(pid(), protocol_message()) :: :ok | {:error, any()}
def route_message(pid, protocol_message) when is_struct(protocol_message) do
GenServer.cast(pid, {:route_message, protocol_message})
def route_message(pid, message) when is_struct(message) do
GenServer.call(pid, {:route_message, message})
end

##########################
Expand All @@ -31,20 +48,38 @@ defmodule AdfSenderConnector.Router do
@doc false
def handle_cast({:route_message, event_name, message}, state) do
build_protocol_msg(Keyword.fetch!(state, :name), message, event_name)
|> build_route_request
|> do_route_msg(state)
|> decode_response
|> build_and_send(state)
{:noreply, state}
end

@doc false
def handle_cast({:route_message, protocol_message}, state) do
%{protocol_message | channel_ref: Keyword.fetch!(state, :name)}
|> build_and_send(state)
{:noreply, state}
end

def handle_call({:route_message, event_name, message}, _from, state) do
{:reply,
build_protocol_msg(Keyword.fetch!(state, :name), message, event_name)
|> build_and_send(state),
state
}
end

def handle_call({:route_message, protocol_message}, _from, state) do
{:reply,
%{protocol_message | channel_ref: Keyword.fetch!(state, :name)}
|> build_and_send(state),
state
}
end

defp build_and_send(p_message, state) do
p_message
|> build_route_request
|> do_route_msg(state)
|> decode_response

{:noreply, state}
end

defp build_protocol_msg(channel_ref, message, event_name) do
Expand All @@ -59,7 +94,7 @@ defmodule AdfSenderConnector.Router do
HTTPoison.post(
Keyword.fetch!(state, :sender_url) <> "/ext/channel/deliver_message",
request,
[{"Content-Type", "application/json"}],
[{"content-type", "application/json"}],
parse_http_opts(state)
)
end
Expand Down
21 changes: 4 additions & 17 deletions clients/backend-client-elixir/lib/adf_sender_connector/spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ defmodule AdfSenderConnector.Spec do
@type correlation_id() :: String.t()
@type event_name() :: String.t()
@type message_data() :: iodata()
@type protocol_message :: %{
channel_ref: channel_ref(),
message_id: message_id(),
correlation_id: correlation_id(),
message_data: message_data(),
event_name: event_name()
}

# inherit server
use GenServer
Expand All @@ -70,16 +63,10 @@ defmodule AdfSenderConnector.Spec do
end

def child_spec(args) do
#case NimbleOptions.validate(args, @args_definition) do
# {:ok, validated_options} ->
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
}
# {:error, reason} ->
# Logger.error("Invalid configuration provided, #{inspect(reason)}")
# raise reason
#end
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
}
end

defp via_tuple(process_alias) do
Expand Down
2 changes: 1 addition & 1 deletion clients/backend-client-elixir/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule AdfSenderConnector.MixProject do
def project do
[
app: :adf_sender_connector,
version: "0.2.1",
version: "0.2.2",
elixir: "~> 1.13",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Code.compiler_options(ignore_module_conflict: true)

defmodule AdfSenderConnector.ChannelTest do
defmodule AdfSenderConnector.CredentialsTest do
use ExUnit.Case
import Mock
alias AdfSenderConnector.Credentials
Expand All @@ -19,7 +19,7 @@ defmodule AdfSenderConnector.ChannelTest do
:ok
end

test "should start channel process" do
test "should start creds process" do

options = [http_opts: [], name: "foo"]

Expand All @@ -40,7 +40,7 @@ defmodule AdfSenderConnector.ChannelTest do

end

test "should start channel process, then should exchange credentials" do
test "should start creds process, then should exchange credentials" do

options = [http_opts: [], app_ref: "app", user_ref: "user1", name: "bar"]

Expand All @@ -63,15 +63,27 @@ defmodule AdfSenderConnector.ChannelTest do

end

# test "should handle fail to request a channel registration" do
# my_http_options = [
# timeout: 10_000, recv_timeout: 10_000, max_connections: 1000
# ]
test "should handle fail to request a channel registration" do

# {:ok, pid} = Channel.start_link([name: :demo2, sender_url: "http://localhost:8082", http_opts: my_http_options])
# response = Channel.create_channel(pid, "a", "b")
# assert {:error, :channel_sender_econnrefused} == response
# Process.exit(pid, :kill)
# end
options = [http_opts: [], app_ref: "app", user_ref: "user1", name: "foo"]

create_response = %HTTPoison.Response{
status_code: 500,
body: "{}"
}

with_mocks([
{HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, create_response} end]}
]) do

{:ok, pid} = Credentials.start_link({:sender_url, "http://localhost:8888"}, options)
assert is_pid(pid)

assert {:error, :channel_sender_unknown_error} == Credentials.exchange_credentials(pid)

Process.exit(pid, :normal)
end

end

end
Loading