Skip to content

Commit

Permalink
Add test for subscribing during writes
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyvanriet committed Jan 30, 2024
1 parent 4961056 commit 99b9beb
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ config :ex_unit,

config :extreme, TestConn,
db_type: "node",
host: "localhost",
host: System.get_env("EVENTSTORE_HOST") || "localhost",
port: "1113",
username: "admin",
password: "changeit",
Expand Down
56 changes: 56 additions & 0 deletions test/subscriptions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,62 @@ defmodule ExtremeSubscriptionsTest do
Helpers.unsubscribe(TestConn, subscription)
end


test "events written while subscribing are also pushed to client in correct order" do
stream = Helpers.random_stream_name()
num_events = 200
# prepopulate stream
events1 =
1..num_events
|> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x}"} end)

events2 =
1..num_events
|> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x + num_events}"} end)

Enum.each(events1, fn e ->
{:ok, %ExMsg.WriteEventsCompleted{}} = TestConn.execute(Helpers.write_events(stream, [e]))
end)

# bombard the stream with individual event writes in the background
spawn(fn ->
Enum.each(events2, fn e ->
{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, [e]))
end)

Logger.debug("Second pack of events written")
end)

# subscribe to existing stream
{:ok, subscriber} = Subscriber.start_link()
{:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2)

Logger.debug("Second pack of events written")

# assert first events are received
for _ <- 1..num_events, do: assert_receive({:on_event, _event})

Logger.debug("First pack of events received")

# assert second pack of events is received as well
for _ <- 1..num_events, do: assert_receive({:on_event, _event}, 1_000)

# assert :caught_up is received when existing events are read
assert_receive :caught_up

# check if events came in correct order.
assert Subscriber.received_events(subscriber) == events1 ++ events2

{:ok, %ExMsg.ReadStreamEventsCompleted{} = response} =
TestConn.execute(Helpers.read_events(stream, 0, 2_000))

assert events1 ++ events2 ==
Enum.map(response.events, fn event -> :erlang.binary_to_term(event.event.data) end)

Helpers.unsubscribe(TestConn, subscription)
end

test "ack timeout can be adjusted" do
sleep = 5_001
stream = Helpers.random_stream_name()
Expand Down

0 comments on commit 99b9beb

Please sign in to comment.