diff --git a/config/test.exs b/config/test.exs index ea059db..46bc2b2 100644 --- a/config/test.exs +++ b/config/test.exs @@ -11,7 +11,7 @@ config :ex_unit, config :extreme, TestConn, db_type: "node", - host: "localhost", + host: System.get_env("EVENTSTORE_HOST") || "localhost", port: "1113", username: "admin", password: "changeit", diff --git a/lib/extreme/request_manager.ex b/lib/extreme/request_manager.ex index a7e9cc5..5c00565 100644 --- a/lib/extreme/request_manager.ex +++ b/lib/extreme/request_manager.ex @@ -183,6 +183,8 @@ defmodule Extreme.RequestManager do end def handle_call({:read_and_stay_subscribed, subscriber, read_params}, from, %State{} = state) do + # IO.puts(":read_and_stay_subscribed") + _start_subscription(self(), from, state.base_name, fn correlation_id -> Extreme.SubscriptionsSupervisor.start_subscription( state.base_name, @@ -221,6 +223,7 @@ defmodule Extreme.RequestManager do {:ok, subscription} = fun.(correlation_id) + # Process.sleep(100) GenServer.cast(req_manager, {:register_subscription, correlation_id, subscription}) GenServer.reply(from, {:ok, subscription}) @@ -244,12 +247,38 @@ defmodule Extreme.RequestManager do end def handle_cast({:process_server_message, message}, %State{} = state) do + parsed = Response.parse(message) + + cond do + elem(parsed, 0) in [:client_identified, :heartbeat_request] -> + :ok + + elem(parsed, 2).__struct__ in [Extreme.Messages.SubscriptionConfirmation] -> + # IO.puts(":process_server_message SubscriptionConfirmation #{inspect(parsed)}") + :ok + + true -> + :ok + end + + # if elem(parsed, 0) not in [:client_identified, :heartbeat_request] do + # IO.puts(":process_server_message, #{inspect(elem(parsed, 2).__struct__)}") + # end + correlation_id = message |> Response.get_correlation_id() - state.subscriptions[correlation_id] - |> _process_server_message(message, state) + sub = state.subscriptions[correlation_id] + + if elem(parsed, 0) not in [:client_identified, :heartbeat_request] and + elem(parsed, 2).__struct__ == Extreme.Messages.StreamEventAppeared and is_nil(sub) do + IO.puts( + ":process_server_message StreamEventAppeared correlation #{inspect(correlation_id)}, subscription #{inspect(sub)}" + ) + end + + sub |> _process_server_message(message, state) {:noreply, state} end @@ -276,6 +305,7 @@ defmodule Extreme.RequestManager do end def handle_cast({:register_subscription, correlation_id, subscription}, %State{} = state) do + IO.puts(":register_subscription") subscriptions = Map.put(state.subscriptions, correlation_id, subscription) {:noreply, %State{state | subscriptions: subscriptions}} end @@ -305,6 +335,8 @@ defmodule Extreme.RequestManager do # message is response to pending request defp _process_server_message(nil, message, state) do + # IO.puts("_process_server_message(nil)") + _in_task(state.base_name, fn -> message |> Response.parse() @@ -313,8 +345,11 @@ defmodule Extreme.RequestManager do end # message is for subscription, decoding needs to be done there so we keep the order of incoming messages - defp _process_server_message(subscription, message, _state), - do: GenServer.cast(subscription, {:process_push, fn -> Response.parse(message) end}) + defp _process_server_message(subscription, message, _state) do + # IO.puts("_process_server_message(#{inspect(subscription)})") + + GenServer.cast(subscription, {:process_push, fn -> Response.parse(message) end}) + end defp _respond_on({:client_identified, _correlation_id}, _), do: :ok diff --git a/test/subscriptions_test.exs b/test/subscriptions_test.exs index 01ee99a..4a09f54 100644 --- a/test/subscriptions_test.exs +++ b/test/subscriptions_test.exs @@ -48,11 +48,13 @@ defmodule ExtremeSubscriptionsTest do end def handle_call({:on_event, event} = message, _from, state) do + # IO.puts("Subscriber {:on_event, event} (#{inspect(message)})") send(state.sender, message) {:reply, :ok, %{state | received: [event | state.received]}} end def handle_call({:on_event, event, _correlation_id} = message, _from, state) do + # IO.puts("Subscriber {:on_event, event, _correlation_id} (#{inspect(message)})") send(state.sender, message) {:reply, :ok, %{state | received: [event | state.received]}} end @@ -528,30 +530,43 @@ defmodule ExtremeSubscriptionsTest do test "events written while reading stream are also pushed to client in correct order" do stream = Helpers.random_stream_name() - num_events = 1_000 + num_events = 200 # prepopulate stream events1 = 1..num_events |> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x}"} end) + # events2 = + # 1..num_events + # |> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x + num_events}", stuff: Enum.shuffle(10000..20000) |> Enum.take(1)} end) + events2 = 1..num_events |> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x + num_events}"} end) - {:ok, %ExMsg.WriteEventsCompleted{}} = - TestConn.execute(Helpers.write_events(stream, events1)) + Enum.each(events1, fn e -> + {:ok, %ExMsg.WriteEventsCompleted{}} = TestConn.execute(Helpers.write_events(stream, [e])) + end) + + spawn(fn -> + Enum.each(events2, fn e -> + {:ok, %ExMsg.WriteEventsCompleted{}} = + TestConn.execute(Helpers.write_events(stream, [e])) + end) + + Logger.debug("Second pack of events written") + end) + + # Process.sleep(100) # subscribe to existing stream {:ok, subscriber} = Subscriber.start_link() - {:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2) + # TODO make it more likely or guaranteed that the race would fail - spawn(fn -> - {:ok, %ExMsg.WriteEventsCompleted{}} = - TestConn.execute(Helpers.write_events(stream, events2)) + {:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2) - Logger.debug("Second pack of events written") - end) + Logger.debug("Second pack of events written") # assert first events are received for _ <- 1..num_events, do: assert_receive({:on_event, _event}) @@ -559,7 +574,7 @@ defmodule ExtremeSubscriptionsTest do Logger.debug("First pack of events received") # assert second pack of events is received as well - for _ <- 1..num_events, do: assert_receive({:on_event, _event}) + for _ <- 1..num_events, do: assert_receive({:on_event, _event}, 1_000) # assert :caught_up is received when existing events are read assert_receive :caught_up diff --git a/test/support/helpers.ex b/test/support/helpers.ex index 9b72c93..648b2b0 100644 --- a/test/support/helpers.ex +++ b/test/support/helpers.ex @@ -1,5 +1,5 @@ defmodule ExtremeTest.Events do - defmodule(PersonCreated, do: defstruct([:name])) + defmodule(PersonCreated, do: defstruct([:name, :stuff])) defmodule(PersonChangedName, do: defstruct([:name])) defmodule(SlowProcessingEventHappened, do: defstruct([:sleep])) end