Skip to content

Commit

Permalink
Try to build a reliably failing test
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyvanriet committed Jan 30, 2024
1 parent 5e42ff9 commit adc8b31
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 16 deletions.
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ config :ex_unit,

config :extreme, TestConn,
db_type: "node",
host: "localhost",
host: System.get_env("EVENTSTORE_HOST") || "localhost",
port: "1113",
username: "admin",
password: "changeit",
Expand Down
43 changes: 39 additions & 4 deletions lib/extreme/request_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ defmodule Extreme.RequestManager do
end

def handle_call({:read_and_stay_subscribed, subscriber, read_params}, from, %State{} = state) do
# IO.puts(":read_and_stay_subscribed")

_start_subscription(self(), from, state.base_name, fn correlation_id ->
Extreme.SubscriptionsSupervisor.start_subscription(
state.base_name,
Expand Down Expand Up @@ -221,6 +223,7 @@ defmodule Extreme.RequestManager do

{:ok, subscription} = fun.(correlation_id)

# Process.sleep(100)
GenServer.cast(req_manager, {:register_subscription, correlation_id, subscription})

GenServer.reply(from, {:ok, subscription})
Expand All @@ -244,12 +247,38 @@ defmodule Extreme.RequestManager do
end

def handle_cast({:process_server_message, message}, %State{} = state) do
parsed = Response.parse(message)

cond do
elem(parsed, 0) in [:client_identified, :heartbeat_request] ->
:ok

elem(parsed, 2).__struct__ in [Extreme.Messages.SubscriptionConfirmation] ->
# IO.puts(":process_server_message SubscriptionConfirmation #{inspect(parsed)}")
:ok

true ->
:ok
end

# if elem(parsed, 0) not in [:client_identified, :heartbeat_request] do
# IO.puts(":process_server_message, #{inspect(elem(parsed, 2).__struct__)}")
# end

correlation_id =
message
|> Response.get_correlation_id()

state.subscriptions[correlation_id]
|> _process_server_message(message, state)
sub = state.subscriptions[correlation_id]

if elem(parsed, 0) not in [:client_identified, :heartbeat_request] and
elem(parsed, 2).__struct__ == Extreme.Messages.StreamEventAppeared and is_nil(sub) do
IO.puts(
":process_server_message StreamEventAppeared correlation #{inspect(correlation_id)}, subscription #{inspect(sub)}"
)
end

sub |> _process_server_message(message, state)

{:noreply, state}
end
Expand All @@ -276,6 +305,7 @@ defmodule Extreme.RequestManager do
end

def handle_cast({:register_subscription, correlation_id, subscription}, %State{} = state) do
IO.puts(":register_subscription")
subscriptions = Map.put(state.subscriptions, correlation_id, subscription)
{:noreply, %State{state | subscriptions: subscriptions}}
end
Expand Down Expand Up @@ -305,6 +335,8 @@ defmodule Extreme.RequestManager do

# message is response to pending request
defp _process_server_message(nil, message, state) do
# IO.puts("_process_server_message(nil)")

_in_task(state.base_name, fn ->
message
|> Response.parse()
Expand All @@ -313,8 +345,11 @@ defmodule Extreme.RequestManager do
end

# message is for subscription, decoding needs to be done there so we keep the order of incoming messages
defp _process_server_message(subscription, message, _state),
do: GenServer.cast(subscription, {:process_push, fn -> Response.parse(message) end})
defp _process_server_message(subscription, message, _state) do
# IO.puts("_process_server_message(#{inspect(subscription)})")

GenServer.cast(subscription, {:process_push, fn -> Response.parse(message) end})
end

defp _respond_on({:client_identified, _correlation_id}, _),
do: :ok
Expand Down
35 changes: 25 additions & 10 deletions test/subscriptions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ defmodule ExtremeSubscriptionsTest do
end

def handle_call({:on_event, event} = message, _from, state) do
# IO.puts("Subscriber {:on_event, event} (#{inspect(message)})")
send(state.sender, message)
{:reply, :ok, %{state | received: [event | state.received]}}
end

def handle_call({:on_event, event, _correlation_id} = message, _from, state) do
# IO.puts("Subscriber {:on_event, event, _correlation_id} (#{inspect(message)})")
send(state.sender, message)
{:reply, :ok, %{state | received: [event | state.received]}}
end
Expand Down Expand Up @@ -528,38 +530,51 @@ defmodule ExtremeSubscriptionsTest do

test "events written while reading stream are also pushed to client in correct order" do
stream = Helpers.random_stream_name()
num_events = 1_000
num_events = 200
# prepopulate stream
events1 =
1..num_events
|> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x}"} end)

# events2 =
# 1..num_events
# |> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x + num_events}", stuff: Enum.shuffle(10000..20000) |> Enum.take(1)} end)

events2 =
1..num_events
|> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x + num_events}"} end)

{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, events1))
Enum.each(events1, fn e ->
{:ok, %ExMsg.WriteEventsCompleted{}} = TestConn.execute(Helpers.write_events(stream, [e]))
end)

spawn(fn ->
Enum.each(events2, fn e ->
{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, [e]))
end)

Logger.debug("Second pack of events written")
end)

# Process.sleep(100)

# subscribe to existing stream
{:ok, subscriber} = Subscriber.start_link()

{:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2)
# TODO make it more likely or guaranteed that the race would fail

spawn(fn ->
{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, events2))
{:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2)

Logger.debug("Second pack of events written")
end)
Logger.debug("Second pack of events written")

# assert first events are received
for _ <- 1..num_events, do: assert_receive({:on_event, _event})

Logger.debug("First pack of events received")

# assert second pack of events is received as well
for _ <- 1..num_events, do: assert_receive({:on_event, _event})
for _ <- 1..num_events, do: assert_receive({:on_event, _event}, 1_000)

# assert :caught_up is received when existing events are read
assert_receive :caught_up
Expand Down
2 changes: 1 addition & 1 deletion test/support/helpers.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule ExtremeTest.Events do
defmodule(PersonCreated, do: defstruct([:name]))
defmodule(PersonCreated, do: defstruct([:name, :stuff]))
defmodule(PersonChangedName, do: defstruct([:name]))
defmodule(SlowProcessingEventHappened, do: defstruct([:sleep]))
end
Expand Down

0 comments on commit adc8b31

Please sign in to comment.