diff --git a/clients/backend-client-elixir/lib/adf_sender_connector.ex b/clients/backend-client-elixir/lib/adf_sender_connector.ex index 8646c27..de4f1e6 100644 --- a/clients/backend-client-elixir/lib/adf_sender_connector.ex +++ b/clients/backend-client-elixir/lib/adf_sender_connector.ex @@ -8,8 +8,45 @@ defmodule AdfSenderConnector do alias AdfSenderConnector.{Credentials, Router, Message} + @typedoc """ + Channel sender base URL + """ + @type sender_url :: String.t() + + @typedoc """ + Application reference + """ + @type application_ref :: String.t() + + @typedoc """ + User reference + """ + @type user_ref :: String.t() + + @typedoc """ + Channel reference + """ + @type channel_ref :: String.t() + + @typedoc """ + Event name + """ + @type event_name :: String.t() + + @typedoc """ + Event payload as a Message struct + """ + @type message :: %Message{} + + + @typedoc """ + Event payload as a Map + """ + @type message_data :: map() + @default_local "http://localhost:8081" + @doc """ starts the process """ @@ -21,7 +58,7 @@ defmodule AdfSenderConnector do def start_link() do HTTPoison.start - Logger.warn("No sender endpoint provided. Using default endpoint https://localhost:8081") + Logger.warning("No sender endpoint provided. Using default endpoint https://localhost:8081") DynamicSupervisor.start_link(__MODULE__, [sender_url: @default_local], name: __MODULE__) end @@ -53,37 +90,49 @@ defmodule AdfSenderConnector do } end - @spec channel_registration(any, any, any) :: {:ok, any()} | {:error, any()} + @spec channel_registration(application_ref(), user_ref(), list()) :: {:ok, map()} | {:error, any()} @doc """ Request a channel registration """ def channel_registration(application_ref, user_ref, options \\ []) do - new_ch = DynamicSupervisor.start_child(__MODULE__, + case find_creds_process(application_ref <> "." <> user_ref) do + :noproc -> + {:ok, pid} = start_creds_proc(application_ref, user_ref, options) + Credentials.exchange_credentials(pid) + pid -> + Credentials.exchange_credentials(pid) + end + end + + defp start_creds_proc(application_ref, user_ref, options) do + DynamicSupervisor.start_child(__MODULE__, Credentials.child_spec([ app_ref: application_ref, user_ref: user_ref, name: application_ref <> "." <> user_ref] ++ options)) + end - case new_ch do - {:ok, pid} -> - Credentials.exchange_credentials(pid) - _ -> - new_ch + defp find_creds_process(name) do + case Registry.lookup(Registry.ADFSenderConnector, name) do + [{pid, _}] -> + pid + [] -> + :noproc end end - @doc """ Starts a process to deliver messages. """ - @spec start_router_process(any, any) :: :ok | {:error, any()} + @spec start_router_process(channel_ref(), list()) :: :ok | {:error, any()} def start_router_process(channel_ref, options \\ []) do new_options = Keyword.delete(options, :name) - DynamicSupervisor.start_child(AdfSenderConnector, AdfSenderConnector.Router.child_spec([name: channel_ref] ++ new_options)) + Logger.debug("Starting routing process: #{inspect(channel_ref)}") + DynamicSupervisor.start_child(__MODULE__, Router.child_spec([name: channel_ref] ++ new_options)) end - @spec route_message(pid(), any, any) :: :ok | {:error, any()} + @spec route_message(channel_ref(), event_name(), message() | message_data()) :: {:ok, map()} | {:error, any()} @doc """ Request a message delivery by creating a protocol message with the data provided """ @@ -96,6 +145,7 @@ defmodule AdfSenderConnector do Router.route_message(pid, event_name, message) end [] -> + Logger.warning(":unknown_channel_reference #{inspect(channel_ref)}") {:error, :unknown_channel_reference} end end diff --git a/clients/backend-client-elixir/lib/adf_sender_connector/credentials.ex b/clients/backend-client-elixir/lib/adf_sender_connector/credentials.ex index d76956d..2ae026a 100644 --- a/clients/backend-client-elixir/lib/adf_sender_connector/credentials.ex +++ b/clients/backend-client-elixir/lib/adf_sender_connector/credentials.ex @@ -4,8 +4,7 @@ defmodule AdfSenderConnector.Credentials do """ use AdfSenderConnector.Spec - - alias AdfSenderConnector.Router + require Logger @doc """ @@ -31,7 +30,7 @@ defmodule AdfSenderConnector.Credentials do case response do {:error, reason} -> Logger.error("Error exchanging credentials, #{inspect(reason)}") - _ -> + _ -> Logger.debug("Credentials exchanged") response end @@ -50,7 +49,7 @@ defmodule AdfSenderConnector.Credentials do HTTPoison.post( Keyword.fetch!(state, :sender_url) <> "/ext/channel/create", request, - [{"Content-Type", "application/json"}], + [{"content-type", "application/json"}], parse_http_opts(state) ) end diff --git a/clients/backend-client-elixir/lib/adf_sender_connector/message.ex b/clients/backend-client-elixir/lib/adf_sender_connector/message.ex index 2d73996..a5db692 100644 --- a/clients/backend-client-elixir/lib/adf_sender_connector/message.ex +++ b/clients/backend-client-elixir/lib/adf_sender_connector/message.ex @@ -9,7 +9,7 @@ defmodule AdfSenderConnector.Message do @type channel_ref() :: String.t() @type message_id() :: String.t() @type correlation_id() :: String.t() - @type message_data() :: iodata() + @type message_data() :: any() @type event_name() :: String.t() @type t() :: AdfSenderConnector.Message.t() diff --git a/clients/backend-client-elixir/lib/adf_sender_connector/router.ex b/clients/backend-client-elixir/lib/adf_sender_connector/router.ex index d97300e..4bf6312 100644 --- a/clients/backend-client-elixir/lib/adf_sender_connector/router.ex +++ b/clients/backend-client-elixir/lib/adf_sender_connector/router.ex @@ -8,20 +8,37 @@ defmodule AdfSenderConnector.Router do alias AdfSenderConnector.Message @doc """ - Requests Channel Sender to route a message, with the indicated event name via the channel_ref. + Requests Channel Sender to route a message, with the indicated event name. Internally the function will build a Message. + This operation does not wait for the completion of the task. """ - @spec route_message(pid(), event_name(), any()) :: :ok | {:error, any()} - def route_message(pid, event_name, message) when is_map(message) do + @spec cast_route_message(pid(), event_name(), any()) :: :ok | {:error, any()} + def cast_route_message(pid, event_name, message) when is_map(message) do GenServer.cast(pid, {:route_message, event_name, message}) end + @doc """ + Requests Channel Sender to route a Message. + This operation does not wait for the completion of the task. + """ + @spec cast_route_message(pid(), Message.t()) :: :ok | {:error, any()} + def cast_route_message(pid, message) when is_struct(message) do + GenServer.cast(pid, {:route_message, message}) + end + + @doc """ + Requests Channel Sender to route a message with the indicated event name. + Internally the function will build a Message struct. + """ + def route_message(pid, event_name, message) when is_map(message) do + GenServer.call(pid, {:route_message, event_name, message}) + end + @doc """ Requests Channel Sender to route a Message. """ - @spec route_message(pid(), protocol_message()) :: :ok | {:error, any()} - def route_message(pid, protocol_message) when is_struct(protocol_message) do - GenServer.cast(pid, {:route_message, protocol_message}) + def route_message(pid, message) when is_struct(message) do + GenServer.call(pid, {:route_message, message}) end ########################## @@ -31,20 +48,38 @@ defmodule AdfSenderConnector.Router do @doc false def handle_cast({:route_message, event_name, message}, state) do build_protocol_msg(Keyword.fetch!(state, :name), message, event_name) - |> build_route_request - |> do_route_msg(state) - |> decode_response + |> build_and_send(state) {:noreply, state} end @doc false def handle_cast({:route_message, protocol_message}, state) do %{protocol_message | channel_ref: Keyword.fetch!(state, :name)} + |> build_and_send(state) + {:noreply, state} + end + + def handle_call({:route_message, event_name, message}, _from, state) do + {:reply, + build_protocol_msg(Keyword.fetch!(state, :name), message, event_name) + |> build_and_send(state), + state + } + end + + def handle_call({:route_message, protocol_message}, _from, state) do + {:reply, + %{protocol_message | channel_ref: Keyword.fetch!(state, :name)} + |> build_and_send(state), + state + } + end + + defp build_and_send(p_message, state) do + p_message |> build_route_request |> do_route_msg(state) |> decode_response - - {:noreply, state} end defp build_protocol_msg(channel_ref, message, event_name) do @@ -59,7 +94,7 @@ defmodule AdfSenderConnector.Router do HTTPoison.post( Keyword.fetch!(state, :sender_url) <> "/ext/channel/deliver_message", request, - [{"Content-Type", "application/json"}], + [{"content-type", "application/json"}], parse_http_opts(state) ) end diff --git a/clients/backend-client-elixir/lib/adf_sender_connector/spec.ex b/clients/backend-client-elixir/lib/adf_sender_connector/spec.ex index 6963a63..0406d8a 100644 --- a/clients/backend-client-elixir/lib/adf_sender_connector/spec.ex +++ b/clients/backend-client-elixir/lib/adf_sender_connector/spec.ex @@ -46,13 +46,6 @@ defmodule AdfSenderConnector.Spec do @type correlation_id() :: String.t() @type event_name() :: String.t() @type message_data() :: iodata() - @type protocol_message :: %{ - channel_ref: channel_ref(), - message_id: message_id(), - correlation_id: correlation_id(), - message_data: message_data(), - event_name: event_name() - } # inherit server use GenServer @@ -70,16 +63,10 @@ defmodule AdfSenderConnector.Spec do end def child_spec(args) do - #case NimbleOptions.validate(args, @args_definition) do - # {:ok, validated_options} -> - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, [args]}, - } - # {:error, reason} -> - # Logger.error("Invalid configuration provided, #{inspect(reason)}") - # raise reason - #end + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [args]}, + } end defp via_tuple(process_alias) do diff --git a/clients/backend-client-elixir/mix.exs b/clients/backend-client-elixir/mix.exs index 5926dc4..44b4e40 100644 --- a/clients/backend-client-elixir/mix.exs +++ b/clients/backend-client-elixir/mix.exs @@ -4,7 +4,7 @@ defmodule AdfSenderConnector.MixProject do def project do [ app: :adf_sender_connector, - version: "0.2.1", + version: "0.2.2", elixir: "~> 1.13", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/clients/backend-client-elixir/test/adf_sender_connector/channel_test.exs b/clients/backend-client-elixir/test/adf_sender_connector/credentials_test.exs similarity index 64% rename from clients/backend-client-elixir/test/adf_sender_connector/channel_test.exs rename to clients/backend-client-elixir/test/adf_sender_connector/credentials_test.exs index a302e43..6de324c 100644 --- a/clients/backend-client-elixir/test/adf_sender_connector/channel_test.exs +++ b/clients/backend-client-elixir/test/adf_sender_connector/credentials_test.exs @@ -1,6 +1,6 @@ Code.compiler_options(ignore_module_conflict: true) -defmodule AdfSenderConnector.ChannelTest do +defmodule AdfSenderConnector.CredentialsTest do use ExUnit.Case import Mock alias AdfSenderConnector.Credentials @@ -19,7 +19,7 @@ defmodule AdfSenderConnector.ChannelTest do :ok end - test "should start channel process" do + test "should start creds process" do options = [http_opts: [], name: "foo"] @@ -40,7 +40,7 @@ defmodule AdfSenderConnector.ChannelTest do end - test "should start channel process, then should exchange credentials" do + test "should start creds process, then should exchange credentials" do options = [http_opts: [], app_ref: "app", user_ref: "user1", name: "bar"] @@ -63,15 +63,27 @@ defmodule AdfSenderConnector.ChannelTest do end - # test "should handle fail to request a channel registration" do - # my_http_options = [ - # timeout: 10_000, recv_timeout: 10_000, max_connections: 1000 - # ] + test "should handle fail to request a channel registration" do - # {:ok, pid} = Channel.start_link([name: :demo2, sender_url: "http://localhost:8082", http_opts: my_http_options]) - # response = Channel.create_channel(pid, "a", "b") - # assert {:error, :channel_sender_econnrefused} == response - # Process.exit(pid, :kill) - # end + options = [http_opts: [], app_ref: "app", user_ref: "user1", name: "foo"] + + create_response = %HTTPoison.Response{ + status_code: 500, + body: "{}" + } + + with_mocks([ + {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, create_response} end]} + ]) do + + {:ok, pid} = Credentials.start_link({:sender_url, "http://localhost:8888"}, options) + assert is_pid(pid) + + assert {:error, :channel_sender_unknown_error} == Credentials.exchange_credentials(pid) + + Process.exit(pid, :normal) + end + + end end diff --git a/clients/backend-client-elixir/test/adf_sender_connector/router_test.exs b/clients/backend-client-elixir/test/adf_sender_connector/router_test.exs index 2fcde70..a007835 100644 --- a/clients/backend-client-elixir/test/adf_sender_connector/router_test.exs +++ b/clients/backend-client-elixir/test/adf_sender_connector/router_test.exs @@ -2,7 +2,9 @@ Code.compiler_options(ignore_module_conflict: true) defmodule AdfSenderConnector.RouterTest do use ExUnit.Case + import Mock + alias AdfSenderConnector.Message alias AdfSenderConnector.Router @moduletag :capture_log @@ -18,15 +20,84 @@ defmodule AdfSenderConnector.RouterTest do {:ok, pid} = Router.start_link({:sender_url, "http://localhost:8082"}, [http_opts: [], - name: "bar.refX"]) + name: "router_tests"]) assert is_pid(pid) %{"process" => pid} end - test "should route message", context do - assert :ok == Router.route_message(Map.fetch!(context, "process"), "my_event_name", %{}) + test "should route map message", context do + + route_response = %HTTPoison.Response{ + status_code: 200, + body: "{ \"result\": \"Ok\" }" + } + + with_mocks([ + {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, route_response} end]} + ]) do + assert {:ok, %{"result" => "Ok"}} == Router.route_message(Map.fetch!(context, "process"), "my_event_name", %{}) + + end + + # Process.exit(pid, :kill) + end + + test "should route struct message", context do + + route_response = %HTTPoison.Response{ + status_code: 200, + body: "{ \"result\": \"Ok\" }" + } + + with_mocks([ + {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, route_response} end]} + ]) do + + msg = Message.new("bar.refX", %{}, "my_event_name") + + assert {:ok, %{"result" => "Ok"}} == Router.route_message(Map.fetch!(context, "process"), "my_event_name", msg) + + end + + # Process.exit(pid, :kill) + end + + test "should route map message - cast", context do + + route_response = %HTTPoison.Response{ + status_code: 200, + body: "{ \"result\": \"Ok\" }" + } + + with_mocks([ + {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, route_response} end]} + ]) do + assert :ok == Router.cast_route_message(Map.fetch!(context, "process"), "my_event_name", %{}) + + end + + # Process.exit(pid, :kill) + end + + test "should route struct message - cast", context do + + route_response = %HTTPoison.Response{ + status_code: 200, + body: "{ \"result\": \"Ok\" }" + } + + with_mocks([ + {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, route_response} end]} + ]) do + + msg = Message.new("bar.refX", %{}, "my_event_name") + + assert :ok == Router.cast_route_message(Map.fetch!(context, "process"), "my_event_name", msg) + + end + # Process.exit(pid, :kill) end diff --git a/clients/backend-client-elixir/test/adf_sender_connector/spec_test.exs b/clients/backend-client-elixir/test/adf_sender_connector/spec_test.exs index be0596f..7912412 100644 --- a/clients/backend-client-elixir/test/adf_sender_connector/spec_test.exs +++ b/clients/backend-client-elixir/test/adf_sender_connector/spec_test.exs @@ -17,20 +17,24 @@ defmodule AdfSenderConnector.SpecTest do end test "should start process" do - {:ok, pid} = FakeImplementor.start_link({:sender_url, "http://localhost:8082"}, [name: :demospec]) + options = [http_opts: [], name: "foo"] + + {:ok, pid} = FakeImplementor.start_link({:sender_url, "http://localhost:8888"}, options) + assert is_pid(pid) - Process.exit(pid, :kill) + Process.exit(pid, :normal) end test "should start process passing opts" do my_http_options = [ hackney: [:insecure, pool: :some_pool], - timeout: 10_000, recv_timeout: 10_000, max_connections: 1000 + timeout: 10_000, recv_timeout: 10_000, max_connections: 1000, + name: "bar" ] - {:ok, pid} = FakeImplementor.start_link({:sender_url, "http://localhost:8082"}, [name: :demospec2, http_opts: my_http_options]) + {:ok, pid} = FakeImplementor.start_link({:sender_url, "http://localhost:8888"}, my_http_options) assert is_pid(pid) - Process.exit(pid, :kill) + Process.exit(pid, :normal) end diff --git a/clients/backend-client-elixir/test/adf_sender_connector_test.exs b/clients/backend-client-elixir/test/adf_sender_connector_test.exs index e18f757..6771d3e 100644 --- a/clients/backend-client-elixir/test/adf_sender_connector_test.exs +++ b/clients/backend-client-elixir/test/adf_sender_connector_test.exs @@ -63,7 +63,7 @@ defmodule AdfSenderConnectorTest do {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, create_response} end]} ]) do assert {:ok, %{"channel_ref" => "dummy.channel.ref2", "channel_secret" => "yyy2"}} - = AdfSenderConnector.channel_registration("a2", "b2", options) + == AdfSenderConnector.channel_registration("a2", "b2", options) end ### then create a process to map that name @@ -81,10 +81,10 @@ defmodule AdfSenderConnectorTest do # route a protocol message message = Message.new("dummy.channel.ref2", %{"hello" => "world"}, "evt1") - assert :ok = AdfSenderConnector.route_message("dummy.channel.ref2", "evt1", message) + assert {:ok, %{"result" => "Ok"}} == AdfSenderConnector.route_message("dummy.channel.ref2", "evt1", message) # route data represented as a Map - assert :ok = AdfSenderConnector.route_message("dummy.channel.ref2", "evt1", %{"hello" => "world"}) + assert {:ok, %{"result" => "Ok"}} == AdfSenderConnector.route_message("dummy.channel.ref2", "evt1", %{"hello" => "world"}) end end @@ -103,7 +103,7 @@ defmodule AdfSenderConnectorTest do {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, create_response} end]} ]) do assert {:ok, %{"channel_ref" => "dummy.channel.ref3", "channel_secret" => "yyy3"}} - = AdfSenderConnector.channel_registration("a3", "b3", options) + == AdfSenderConnector.channel_registration("a3", "b3", options) end ### then create a process to map that name @@ -120,7 +120,7 @@ defmodule AdfSenderConnectorTest do ]) do message = Message.new("dummy.channel.ref3", %{"hello" => "world"}, "evt1") - assert :ok = AdfSenderConnector.route_message("dummy.channel.ref3", "evt1", message) + assert {:error, :channel_sender_unknown_error} == AdfSenderConnector.route_message("dummy.channel.ref3", "evt1", message) end end