From be436cdbc248dcc44f36b35800b1075f9a01c4ed Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Wed, 12 May 2021 10:10:04 +0200 Subject: [PATCH 1/8] [FanoutListener] Guarantee established subscription before start_link returns --- .travis.yml | 29 ----------------------------- README.md | 2 +- lib/fanout_listener.ex | 4 ++-- test/extreme_benchmark_test.exs | 3 ++- 4 files changed, 5 insertions(+), 33 deletions(-) delete mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 55fe447..0000000 --- a/.travis.yml +++ /dev/null @@ -1,29 +0,0 @@ -language: elixir - -otp_release: - - 22.2 - - 23.1 -elixir: - - 1.10.4 - - 1.11.0 - -env: - - ES_TARBALL='ubuntu/EventStore-OSS-Linux-Ubuntu-18.04-v5.0.8.tar.gz' ES_VERSION=4 - -script: - - "mix do deps.get, format --check-formatted, compile --force --warnings-as-errors, test --cover" - -before_install: - - wget https://eventstore.org/downloads/${ES_TARBALL} - - tar xf EventStore*.tar.gz - - cd EventStore* && ./run-node.sh --run-projections=all --start-standard-projections=true --mem-db 2>&1> eventstore.log & - -after_success: - - mix inch.report - - head -n 7 EventStore*/eventstore.log - -after_failure: - - cat EventStore*/eventstore.log - -after_script: - - killall -SIGINT clusternode diff --git a/README.md b/README.md index bea6e30..73dc8e9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Extreme -[![Build Status](https://travis-ci.org/exponentially/extreme.svg?branch=v1.0.0)](https://travis-ci.org/exponentially/extreme) +[![Build Status](https://github.com/exponentially/extreme/actions/workflows/test.yml/badge.svg)](https://github.com/exponentially/extreme/actions/workflows/test.yml) [![Hex version](https://img.shields.io/hexpm/v/extreme.svg "Hex version")](https://hex.pm/packages/extreme) [![InchCI](https://inch-ci.org/github/exponentially/extreme.svg?branch=v1.0.0)](https://inch-ci.org/github/exponentially/extreme) [![Coverage Status](https://coveralls.io/repos/github/exponentially/extreme/badge.svg?branch=v1.0.0)](https://coveralls.io/github/exponentially/extreme?branch=v1.0.0) diff --git a/lib/fanout_listener.ex b/lib/fanout_listener.ex index 6073e68..1d2467c 100644 --- a/lib/fanout_listener.ex +++ b/lib/fanout_listener.ex @@ -86,8 +86,8 @@ defmodule Extreme.FanoutListener do stream_name: stream_name } - GenServer.cast(self(), :subscribe) - {:ok, state} + {:ok, subscription, ref} = _subscribe(state) + {:ok, %{state | subscription: subscription, subscription_ref: ref}} end @impl true diff --git a/test/extreme_benchmark_test.exs b/test/extreme_benchmark_test.exs index 8f3ac73..583decf 100644 --- a/test/extreme_benchmark_test.exs +++ b/test/extreme_benchmark_test.exs @@ -75,7 +75,8 @@ defmodule ExtremeBenchmarkTest do {time, _} = :timer.tc(fn -> - TestConn.execute(Helpers.write_events(stream, initial_events), nil, 10_000) + {:ok, %Extreme.Messages.WriteEventsCompleted{}} = + TestConn.execute(Helpers.write_events(stream, initial_events), nil, 10_000) end) time From 3f58a5e160725a9693935898b230d913232f126d Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Thu, 13 May 2021 13:28:16 +0200 Subject: [PATCH 2/8] update deps --- .github/workflows/test.yml | 6 +- README.md | 522 +------------------------------------ mix.exs | 3 +- mix.lock | 3 +- 4 files changed, 9 insertions(+), 525 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 40903c6..fd3dd90 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,13 +17,13 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - elixir: ['1.9.4', '1.10.4', '1.11.3'] - erlang: ['22.3', '23.2'] + elixir: ['1.9.4', '1.10.4', '1.11.4'] + erlang: ['22.3', '23.3'] steps: - uses: actions/checkout@v2 - name: Set up Elixir - uses: erlef/setup-elixir@v1 + uses: erlef/setup-beam@v1 with: elixir-version: ${{ matrix.elixir }} otp-version: ${{ matrix.erlang }} diff --git a/README.md b/README.md index 73dc8e9..3f42ffd 100644 --- a/README.md +++ b/README.md @@ -8,524 +8,6 @@ Erlang/Elixir TCP client for [Event Store](http://geteventstore.com/). -This version is tested with EventStore 3.9.3, 4.1.1 and 5.0.10, Elixir 1.5 - 1.11 and Erlang/OTP 19.3 - 23.1 +This version is tested with EventStore 3.9.3, 4.1.1 and 5.0.10, Elixir 1.9 - 1.11 and Erlang/OTP 22.3 - 24.0 -## INSTALL - -Add Extreme as a dependency in your `mix.exs` file. - -```elixir -def deps do - [{:extreme, "~> 1.0.0-rc1"}] -end -``` - -After you are done, run `mix deps.get` in your shell to fetch and compile Extreme and its dependencies. - -### EventStore v4 and later note - -Starting from EventStore version 4.0 there are some upgrades to communication protocol. Event number size is changed to 64bits -and there is new messages `IdentifyClient` and `ClientIdentified`. Since we would like to keep backward compatibility with older v3 protocol, -we introduced new configuration for `:extreme` application, where you have to set `:protocol_version` equal to `4` if you want to use new protocol, default is `3`. -Below is exact line you have to add in you application config file in order to activate new protocol: - -```elixir -config :extreme, :protocol_version, 4 -``` - -## USAGE - -The best way to understand how adapter should be used is by investigating `test/extreme_test.exs` file, -but we'll try to explain some details in here as well. - -Extreme is implemented using GenServer and is OTP compatible. -If client is disconnected from server we are not trying to reconnect, instead you should rely on your supervisor. -For example: - -```elixir -defmodule MyApp.Supervisor do - use Supervisor - - def start_link do - Supervisor.start_link __MODULE__, :ok - end - - @event_store MyApp.EventStore - - def init(:ok) do - event_store_settings = Application.get_env :my_app, :event_store - - children = [ - worker(Extreme, [event_store_settings, [name: @event_store]]), - # ... other workers / supervisors - ] - supervise children, strategy: :one_for_one - end -end -``` - -You can manually start adapter as well (as you can see in test file): - -```elixir -{:ok, server} = Application.get_env(:extreme, :event_store) |> Extreme.start_link -``` - -From now on, `server` pid is used for further communication. Since we are relying on supervisor to reconnect, -it is wise to name `server` as we did in example above. - - -### MODES - -Extreme can connect to single ES node or to cluster specified with node IPs and ports. - -Example for connecting to single node: - -```elixir -config :extreme, :event_store, - db_type: :node, - host: "localhost", - port: 1113, - username: "admin", - password: "changeit", - reconnect_delay: 2_000, - connection_name: :my_app, - max_attempts: :infinity -``` - -* `db_type` - defaults to :node, thus it can be omitted -* `host` - check EXT IP setting of your EventStore -* `port` - check EXT TCP PORT setting of your EventStore -* `reconnect_delay` - in ms. Defaults to 1_000. If tcp connection fails this is how long it will wait for reconnection. -* `connection_name` - Optional param introduced in EventStore 4. Connection can be identified by this name on ES UI -* `max_attempts` - Defaults to :infinity. Specifies how many times we'll try to connect to EventStore - - -Example for connecting to cluster: - -```elixir -config :extreme, :event_store, - db_type: :cluster, - gossip_timeout: 300, - mode: :read, - nodes: [ - %{host: "10.10.10.29", port: 2113}, - %{host: "10.10.10.28", port: 2113}, - %{host: "10.10.10.30", port: 2113} - ], - connection_name: :my_app, - username: "admin", - password: "changeit" -``` - -* `gossip_timeout` - in ms. Defaults to 1_000. We are iterating through `nodes` list, asking for cluster member details. -This setting represents timeout for gossip response before we are asking next node from `nodes` list for cluster details. -* `nodes` - Mandatory for cluster connection. Represents list of nodes in the cluster as we know it - * `host` - should be EXT IP setting of your EventStore node - * `port` - should be EXT HTTP PORT setting of your EventStore node -* `mode` - Defaults to `:write` where Master node is prefered over Slave, otherwise prefer Slave over Master - -Example of connection to cluster via DNS lookup - -```elixir -config :extreme, :event_store, - db_type: :cluster_dns, - gossip_timeout: 300, - host: "es-cluster.example.com", # accepts char list too, this whould be multy A record host enrty in your nameserver - port: 2113, # the external gossip port - connection_name: :my_app, - username: "admin", - password: "changeit", - mode: :write, - max_attempts: :infinity -``` - -When `cluster` mode is used, adapter goes thru `nodes` list and tries to gossip with node one after another -until it gets response about nodes. Based on nodes information from that response it ranks their statuses and chooses -the best candidate to connect to. For `:write` mode (default) `Master` node is prefered over `Slave`, -but for `:read` mode it is opposite. For the way ranking is done, take a look at `lib/cluster_connection.ex`: - -```elixir -defp rank_state("Master", :write), do: 1 -defp rank_state("Master", _), do: 2 -defp rank_state("PreMaster", :write), do: 2 -defp rank_state("PreMaster", _), do: 3 -defp rank_state("Slave", :write), do: 3 -defp rank_state("Slave", _), do: 1 -``` - -Note that above will work with same procedure with `cluster_dns` mode turned on, since internally it will get ip addresses to which the same connection procedure will be used. - -Once client is disconnected from EventStore, supervisor should respawn it and connection starts over again. - -### Read-only clients - -Extreme modules may be configured as read-only with the `:read_only` option -(default: `false`) - -```elixir -config :extreme, :event_store, - db_type: :node, - host: "localhost", - port: 1113, - username: "admin", - password: "changeit", - reconnect_delay: 2_000, - connection_name: :my_app, - max_attempts: :infinity, - read_only: true # <- marked as read-only -``` - -Read-only modules are not allowed to perform write operations like writing -events or deleting streams. Read-only clients may execute the following -messages: - -- `ReadEvent` -- `ReadStreamEvents` -- `ReadStreamEventsBackward` -- `ReadAllEvents` -- `ConnectToPersistentSubscription` -- `SubscribeToStream` -- `UnsubscribeFromStream` - -Use read-only clients to ensure that a listener does not commit writes. This is -particularly useful for Read Models. - -### Communication - -EventStore uses ProtoBuf for taking requests and sending responses back. -We are using [exprotobuf](https://github.com/bitwalker/exprotobuf) to deal with them. -List and specification of supported protobuf messages can be found in `include/event_store.proto` file. - -Instead of wrapping each and every request in elixir function, we are using `execute/2` function that takes server pid and request message: - -```elixir -{:ok, response} = Extreme.execute server, write_events() -``` - -where `write_events` can be helper function like: - -```elixir -alias Extreme.Msg, as: ExMsg - -defp write_events(stream \\ "people", events \\ [%PersonCreated{name: "Pera Peric"}, %PersonChangedName{name: "Zika"}]) do - proto_events = Enum.map(events, fn event -> - ExMsg.NewEvent.new( - event_id: Extreme.Tools.gen_uuid(), - event_type: to_string(event.__struct__), - data_content_type: 0, - metadata_content_type: 0, - data: :erlang.term_to_binary(event), - metadata: "" - ) end) - ExMsg.WriteEvents.new( - event_stream_id: stream, - expected_version: -2, - events: proto_events, - require_master: false - ) -end -``` - -This way you can fine tune your requests, i.e. choose your serialization. We are using erlang serialization in this case -`data: :erlang.term_to_binary(event)`, but you can do whatever suites you. -For more information about protobuf messages EventStore uses, -take a look at their [documentation](http://docs.geteventstore.com) or for common use cases -you can check `test/extreme_test.exs` file. - - -### Subscriptions - -`Extreme.subscribe_to/3` function is used to get notified on new events on particular stream. -This way subscriber, in next example `self`, will get message `{:on_event, push_message}` when new event is added to stream -_people_. - -```elixir -def subscribe(server, stream \\ "people"), do: Extreme.subscribe_to(server, self, stream) - -def handle_info({:on_event, event}, state) do - Logger.debug "New event added to stream 'people': #{inspect event}" - {:noreply, state} -end -``` - - -`Extreme.read_and_stay_subscribed/7` reads all events that follow a specified event number, and subscribes to future events. - -```elixir -defmodule MyApp.StreamSubscriber - use GenServer - - def start_link(extreme, last_processed_event), do: GenServer.start_link __MODULE__, {extreme, last_processed_event} - - def init({extreme, last_processed_event}) do - stream = "people" - state = %{ event_store: extreme, stream: stream, last_event: last_processed_event } - GenServer.cast self, :subscribe - {:ok, state} - end - - def handle_cast(:subscribe, state) do - # read only unprocessed events and stay subscribed - {:ok, subscription} = Extreme.read_and_stay_subscribed state.event_store, self, state.stream, state.last_event + 1 - # we want to monitor when subscription is crashed so we can resubscribe - ref = Process.monitor subscription - {:noreply, %{state|subscription_ref: ref}} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, %{subscription_ref: ref} = state) do - GenServer.cast self, :subscribe - {:noreply, state} - end - def handle_info({:on_event, push}, state) do - push.event.data - |> :erlang.binary_to_term - |> process_event - event_number = push.link.event_number - :ok = update_last_event state.stream, event_number - {:noreply, %{state|last_event: event_number}} - end - def handle_info(:caught_up, state) do - Logger.debug "We are up to date!" - {:noreply, state} - end - def handle_info(_msg, state), do: {:noreply, state} - - defp process_event(event), do: IO.puts("Do something with #{inspect event}") - defp update_last_event(_stream, _event_number), do: IO.puts("Persist last processed event_number for stream") -end -``` - -This way unprocessed events will be sent by Extreme, using `{:on_event, push}` message. -After all persisted messages are sent, :caught_up message is sent and then new messages will be sent the same way -as they arrive to stream. - -If you subscribe to non existing stream you'll receive message {:extreme, severity, problem, stream} where severity can be either `:error` (for subscription on hard deleted stream) or `:warn` (for subscription on non existing or soft deleted stream). Problem is explanation of problem (i.e. :stream_hard_deleted). So in your receiver you can either have catch all `handle_info(_message, _state)` or you can handle such message: - -```elixir -def handle_info({:extreme, _, problem, stream}=message, state) do - Logger.warn "Stream #{stream} issue: #{to_string problem}" - {:noreply, state} -end -``` - -### Extreme.Listener - -Since it is common on read side of system to read events and denormalize them, -there is Extreme.Listener macro that hides noise from listener: - -```elixir -defmodule MyApp.MyListener do - use Extreme.Listener - import MyApp.MyProcessor - - # returns last processed event by MyListener on stream_name, -1 if none has been processed so far - defp get_last_event(stream_name), do: DB.get_last_event MyListener, stream_name - - defp process_push(push, stream_name) do - #for indexed stream we need to follow push.link.event_number, otherwise push.event.event_number - event_number = push.link.event_number - DB.in_transaction fn -> - Logger.info "Do some processing of event #{inspect push.event.event_type}" - :ok = push.event.data - |> :erlang.binary_to_term - |> process_event(push.event.event_type) - DB.ack_event(MyListener, stream_name, event_number) - end - {:ok, event_number} - end - - # This override is optional - defp caught_up, do: Logger.debug("We are up to date. YEEEY!!!") -end - -defmodule MyApp.MyProcessor do - def process_event(data, "Elixir.MyApp.Events.PersonCreated") do - Logger.debug "Doing something with #{inspect data}" - :ok - end - def process_event(_, _), do: :ok # Just acknowledge events we are not interested in -end -``` - -Listener can be started manually but it is most common to place it in supervisor AFTER specifing Extreme: - -```elixir -defmodule MyApp.Supervisor do - use Supervisor - - def start_link, do: Supervisor.start_link __MODULE__, :ok - - @event_store MyApp.EventStore - - def init(:ok) do - event_store_settings = Application.get_env :my_app, :event_store - - children = [ - worker(Extreme, [event_store_settings, [name: @event_store]]), - worker(MyApp.MyListener, [@event_store, "my_indexed_stream", [name: MyListener]]), - # ... other workers / supervisors - ] - supervise children, strategy: :one_for_one - end -end -``` - -Subscription can be paused: - -```elixir -{:ok, last_event_number} = MyApp.MyListener.pause MyListener -``` - -and resumed - -```elixir -:ok = MyApp.MyListener.resume MyListener -``` - -### Extreme.FanoutListener - -It's not uncommon situation to listen live events and propagate them (for example on web sockets). -For that situation there is Extreme.FanoutListener macro that hides noise from listener: - -```elixir -defmodule MyApp.MyFanoutListener do - use Extreme.FanoutListener - import MyApp.MyPusher - - defp process_push(push) do - Logger.info "Forward to web socket event #{inspect push.event.event_type}" - :ok = push.event.data - |> :erlang.binary_to_term - |> process_event(push.event.event_type) - end -end - -defmodule MyApp.MyPusher do - def process_event(data, "Elixir.MyApp.Events.PersonCreated") do - Logger.debug "Transform and push event with data: #{inspect data}" - :ok - end - def process_event(_, _), do: :ok # Just acknowledge events we are not interested in -end -``` - -Listener can be started manually but it is most common to place it in supervisor AFTER specifing Extreme: - -```elixir -defmodule MyApp.Supervisor do - use Supervisor - - def start_link, do: Supervisor.start_link __MODULE__, :ok - - @event_store MyApp.EventStore - - def init(:ok) do - event_store_settings = Application.get_env :my_app, :event_store - - children = [ - worker(Extreme, [event_store_settings, [name: @event_store]]), - worker(MyApp.MyFanoutListener, [@event_store, "my_indexed_stream", [name: MyFanoutListener]]), - # ... other workers / supervisors - ] - supervise children, strategy: :one_for_one - end -end -``` - -### Persistent subscriptions - -The Event Store provides an alternate event subscription model, from version 3.2.0, known as [competing consumers](http://docs.geteventstore.com/introduction/latest/competing-consumers). Instead of the client holding the state of the subscription, the server remembers it. - -#### Create a persistent subscription - -The first step in using persistent subscriptions is to create a new subscription. This can be done using the Event Store admin website or in your application code, as shown below. You must provide a unique subscription group name and the stream to receive events from. - -```elixir -alias Extreme.Msg, as: ExMsg - -{:ok, _} = Extreme.execute(server, ExMsg.CreatePersistentSubscription.new( - subscription_group_name: "person-subscription", - event_stream_id: "people", - resolve_link_tos: false, - start_from: 0, - message_timeout_milliseconds: 10_000, - record_statistics: false, - live_buffer_size: 500, - read_batch_size: 20, - buffer_size: 500, - max_retry_count: 10, - prefer_round_robin: true, - checkpoint_after_time: 1_000, - checkpoint_max_count: 500, - checkpoint_min_count: 1, - subscriber_max_count: 1 -)) -``` - -#### Connect to a persistent subscription - -`Extreme.connect_to_persistent_subscription/5` function is used subscribe to an existing persistent subscription. The subscriber, in this example `self`, will receive message `{:on_event, push_message}` when each new event is added to stream -_people_. - -```elixir -{:ok, subscription} = Extreme.connect_to_persistent_subscription(server, self(), group, stream, buffer_size) -``` - -#### Receive & acknowledge events - -You must acknowledge receipt, and successful processing, of each received event. The Event Store will remember the last acknowledged event. The subscription will resume from this position should the subscriber process terminate and reconnect. This simplifies the client logic - the code you must write. - -`Extreme.PersistentSubscription.ack/3` function is used to acknowledge receipt of an event. - -```elixir -receive do - {:on_event, event, correlation_id} -> - Logger.debug "New event added to stream 'people': #{inspect event}" - :ok = Extreme.PersistentSubscription.ack(subscription, event, correlation_id) -end -``` - -You must track the `subscription` PID returned from the `Extreme.connect_to_persistent_subscription/5` function as part of the process state when using a `GenServer` subscriber. - -```elixir -def handle_info({:on_event, event, correlation_id}, %{subscription: subscription} = state) do - Logger.debug "New event added to stream 'people': #{inspect event}" - :ok = Extreme.PersistentSubscription.ack(subscription, event, correlation_id) - {:noreply, state} -end -``` - -Events can also be not acknowledged. They can be not acknowledged with a nack_action of :Park, :Retry, :Skip, or :Stop. - -```elixir -def handle_info({:on_event, event, correlation_id}, %{subscription: subscription} = state) do - Logger.debug "New event added to stream 'people': #{inspect event}" - if needs_to_retry do - :ok = Extreme.PersistentSubscription.nack(subscription, event, correlation_id, :Retry) - else - :ok = Extreme.PersistentSubscription.ack(subscription, event, correlation_id) - end - {:noreply, state} -end -``` - -## Building modules from .proto file - -Follow steps 1. and 2. from https://github.com/tony612/protobuf-elixir#generate-elixir-code, -then run: - -```bash -protoc --elixir_out=./ include/event_store.proto && \ -sed -i '' 's/EventStore\.Client/Extreme/g' include/event_store.pb.ex && \ -mv include/event_store.pb.ex lib/extreme/messages.ex -``` - -## Contributing - -1. Fork it -2. Create your feature branch (`git checkout -b my-new-feature`) -3. Commit your changes (`git commit -am 'add some feature'`) -4. Push to the branch (`git push origin my-new-feature`) -5. Create new Pull Request - -## Licensed under The MIT License. +Documentation is TBD diff --git a/mix.exs b/mix.exs index c0687b3..6fff50e 100644 --- a/mix.exs +++ b/mix.exs @@ -37,6 +37,7 @@ defmodule Extreme.Mixfile do [ {:exprotobuf, "~> 1.2.9"}, {:elixir_uuid, "~> 1.2"}, + {:telemetry, "~> 0.4"}, # needed when connecting to EventStore cluster (node_type: :cluster | :cluster_dns) {:jason, "~> 1.1", optional: true}, @@ -52,7 +53,7 @@ defmodule Extreme.Mixfile do defp _package do [ files: ["lib", "mix.exs", "README*", "LICENSE*", "include"], - maintainers: ["Milan Burmaja", "Milan Jaric"], + maintainers: ["Milan Burmaja"], licenses: ["MIT"], links: %{"GitHub" => "https://github.com/exponentially/extreme"} ] diff --git a/mix.lock b/mix.lock index 88b7f37..10ec3b6 100644 --- a/mix.lock +++ b/mix.lock @@ -6,7 +6,7 @@ "exactor": {:hex, :exactor, "2.2.4", "5efb4ddeb2c48d9a1d7c9b465a6fffdd82300eb9618ece5d34c3334d5d7245b1", [:mix], [], "hexpm", "1222419f706e01bfa1095aec9acf6421367dcfab798a6f67c54cf784733cd6b5"}, "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "32e95820a97cffea67830e91514a2ad53b888850442d6d395f53a1ac60c82e07"}, "exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"}, - "exvcr": {:hex, :exvcr, "0.12.2", "e8fc0beeb62924d3b755b2718a161b13cb4ed53311378e5e587606c15190c8ed", [:mix], [{:exactor, "~> 2.2", [hex: :exactor, repo: "hexpm", optional: false]}, {:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 0.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "63776555a1bd003ff60635aead47461ced1ff985c66427421ad344e317ba983c"}, + "exvcr": {:hex, :exvcr, "0.12.3", "eca9e0dc8727eed65621c334d34a9640760b40f748a871728cbdbe534d336c11", [:mix], [{:exactor, "~> 2.2", [hex: :exactor, repo: "hexpm", optional: false]}, {:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 0.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "668bd08fad65bb21d4c9bb46f1747e5e01b5734d6387fa792ce0f2eb81b17b2b"}, "gpb": {:hex, :gpb, "4.17.3", "b546f9ad69584508274e49f9bf883b99fa67092444adeff5dc624e9123efc003", [:make, :rebar3], [], "hexpm", "13918b1c371adf9a6d3892e6b1570c28e284233f714fe2a7f01d8a556aa41541"}, "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm", "fc3499fed7a726995aa659143a248534adc754ebd16ccd437cd93b649a95091f"}, @@ -16,4 +16,5 @@ "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, "protobuf": {:hex, :protobuf, "0.5.4", "2e1b8eec211aff034ad8a14e3674220b0158bfb9a3c7128ac9d2a1ed1b3724d3", [:mix], [], "hexpm"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, } From 3067eb5142e7343ce32f6bd8737e63e6bb0cf0e2 Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Mon, 16 Aug 2021 11:51:54 +0200 Subject: [PATCH 3/8] Use EventStore as a github service rather then action step --- .github/workflows/test.yml | 25 ++++++++++++++----------- lib/extreme/request_manager.ex | 2 +- lib/extreme/tcp.ex | 1 - lib/listener.ex | 4 +--- test/fanout_listener_test.exs | 4 +--- 5 files changed, 17 insertions(+), 19 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fd3dd90..13d39ea 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,7 +8,6 @@ on: - v1.0.0 env: MIX_ENV: test - ES_TARBALL: "ubuntu/EventStore-OSS-Linux-Ubuntu-18.04-v5.0.10.tar.gz" ES_VERSION: 4 jobs: @@ -17,8 +16,20 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - elixir: ['1.9.4', '1.10.4', '1.11.4'] - erlang: ['22.3', '23.3'] + elixir: ['1.11.4', '1.12.2'] + erlang: ['22.3', '23.3', '24.0'] + + services: + es: + image: eventstore/eventstore:release-5.0.10 + ports: ['1113:1113'] + env: + EVENTSTORE_RUN_PROJECTIONS: "All" + EVENTSTORE_START_STANDARD_PROJECTIONS: "true" + EVENTSTORE_CLUSTER_SIZE: 1 + EVENTSTORE_EXT_TCP_PORT: 1113 + EVENTSTORE_INSECURE: "true" + EVENTSTORE_ENABLE_EXTERNAL_TCP: "true" steps: - uses: actions/checkout@v2 @@ -43,13 +54,5 @@ jobs: restore-keys: ${{ runner.os }}-build_test-${{ matrix.elixir }}-${{ matrix.erlang }}- - name: Check warnings run: mix compile --warnings-as-errors --force - - name: Check formatting - run: mix format --check-formatted - - name: Prepare EventStore - run: | - wget https://raw.githubusercontent.com/EventStore/Downloads/master/${ES_TARBALL} - tar xzvf EventStore*.tar.gz - rm EventStore*.tar.gz - cd EventStore* && ./run-node.sh --run-projections=all --start-standard-projections=true --mem-db 2>&1> eventstore.log & - name: Run tests run: mix test --cover diff --git a/lib/extreme/request_manager.ex b/lib/extreme/request_manager.ex index ba136f0..d31fe90 100644 --- a/lib/extreme/request_manager.ex +++ b/lib/extreme/request_manager.ex @@ -148,7 +148,7 @@ defmodule Extreme.RequestManager do _from, %State{read_only: true} = state ) - when not (message_type in @read_only_message_types) do + when message_type not in @read_only_message_types do {:reply, {:error, :read_only}, state} end diff --git a/lib/extreme/tcp.ex b/lib/extreme/tcp.ex index aadb1e3..10855ca 100644 --- a/lib/extreme/tcp.ex +++ b/lib/extreme/tcp.ex @@ -43,7 +43,6 @@ defmodule Extreme.Tcp do reason -> Logger.warn(fn -> "Error connecting to EventStore: #{inspect(reason)}" end) - {:error, reason} connect(host, port, configuration, attempt + 1) end end diff --git a/lib/listener.ex b/lib/listener.ex index 2b6074b..4ea1b35 100644 --- a/lib/listener.ex +++ b/lib/listener.ex @@ -138,9 +138,7 @@ defmodule Extreme.Listener do ref = Process.monitor(subscription) Logger.info(fn -> - "Listener subscribed to stream #{state.stream_name}. Start processing live events from event no: #{ - last_event + 1 - }" + "Listener subscribed to stream #{state.stream_name}. Start processing live events from event no: #{last_event + 1}" end) {:ok, diff --git a/test/fanout_listener_test.exs b/test/fanout_listener_test.exs index ef81e31..88f3441 100644 --- a/test/fanout_listener_test.exs +++ b/test/fanout_listener_test.exs @@ -9,9 +9,7 @@ defmodule Extreme.FanoutListenerTest do defp process_push(push) do IO.puts( - "pushing event ##{push.event.event_number} to test #{ - inspect(Process.whereis(:fanout_test)) - }" + "pushing event ##{push.event.event_number} to test #{inspect(Process.whereis(:fanout_test))}" ) send(:fanout_test, {:processing_push, push.event.event_type, push.event.data}) From 9aea03cc7b42e9ad113dc86a0cf1c3b7b9dfdfd8 Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Fri, 27 Aug 2021 10:54:12 +0200 Subject: [PATCH 4/8] support telemetry ~> 1.0.0 --- mix.exs | 4 ++-- mix.lock | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mix.exs b/mix.exs index 6fff50e..97a9435 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Extreme.Mixfile do def project do [ app: :extreme, - version: "1.0.0", + version: "1.0.1", elixir: "~> 1.7", elixirc_paths: _elixirc_paths(Mix.env()), source_url: "https://github.com/exponentially/extreme", @@ -37,7 +37,7 @@ defmodule Extreme.Mixfile do [ {:exprotobuf, "~> 1.2.9"}, {:elixir_uuid, "~> 1.2"}, - {:telemetry, "~> 0.4"}, + {:telemetry, "~> 0.4 or ~> 1.0"}, # needed when connecting to EventStore cluster (node_type: :cluster | :cluster_dns) {:jason, "~> 1.1", optional: true}, diff --git a/mix.lock b/mix.lock index 10ec3b6..58a696a 100644 --- a/mix.lock +++ b/mix.lock @@ -1,13 +1,13 @@ %{ "earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.13", "0c98163e7d04a15feb62000e1a891489feb29f3d10cb57d4f845c405852bbef8", [:mix], [], "hexpm", "d602c26af3a0af43d2f2645613f65841657ad6efc9f0e361c3b6c06b578214ba"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "ex_doc": {:hex, :ex_doc, "0.24.2", "e4c26603830c1a2286dae45f4412a4d1980e1e89dc779fcd0181ed1d5a05c8d9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "e134e1d9e821b8d9e4244687fb2ace58d479b67b282de5158333b0d57c6fb7da"}, + "ex_doc": {:hex, :ex_doc, "0.25.1", "4b736fa38dc76488a937e5ef2944f5474f3eff921de771b25371345a8dc810bc", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3200b0a69ddb2028365281fbef3753ea9e728683863d8cdaa96580925c891f67"}, "exactor": {:hex, :exactor, "2.2.4", "5efb4ddeb2c48d9a1d7c9b465a6fffdd82300eb9618ece5d34c3334d5d7245b1", [:mix], [], "hexpm", "1222419f706e01bfa1095aec9acf6421367dcfab798a6f67c54cf784733cd6b5"}, "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "32e95820a97cffea67830e91514a2ad53b888850442d6d395f53a1ac60c82e07"}, "exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"}, - "exvcr": {:hex, :exvcr, "0.12.3", "eca9e0dc8727eed65621c334d34a9640760b40f748a871728cbdbe534d336c11", [:mix], [{:exactor, "~> 2.2", [hex: :exactor, repo: "hexpm", optional: false]}, {:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 0.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "668bd08fad65bb21d4c9bb46f1747e5e01b5734d6387fa792ce0f2eb81b17b2b"}, - "gpb": {:hex, :gpb, "4.17.3", "b546f9ad69584508274e49f9bf883b99fa67092444adeff5dc624e9123efc003", [:make, :rebar3], [], "hexpm", "13918b1c371adf9a6d3892e6b1570c28e284233f714fe2a7f01d8a556aa41541"}, + "exvcr": {:hex, :exvcr, "0.13.2", "e17fd3ee3a341f41a3aa65a3ce73a339759a9d0658f83782492c6e9b6cf9daa4", [:mix], [{:exactor, "~> 2.2", [hex: :exactor, repo: "hexpm", optional: false]}, {:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:finch, "~> 0.8.0", [hex: :finch, repo: "hexpm", optional: true]}, {:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 0.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "17f41a533d14f582fe6b5f83214f058cf5ba77c6a7bc15bc53a9ea1827d92d96"}, + "gpb": {:hex, :gpb, "4.18.0", "305548ad991583f4b9809e905d6a17475ab8df85116cb3c1269fda1e4424c7ea", [:make, :rebar3], [], "hexpm", "c2bb843866e627e1e0181c2f5e4b3ee79cbb8cd3574f66767e7f1d5130d6b025"}, "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm", "fc3499fed7a726995aa659143a248534adc754ebd16ccd437cd93b649a95091f"}, "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, @@ -16,5 +16,5 @@ "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, "protobuf": {:hex, :protobuf, "0.5.4", "2e1b8eec211aff034ad8a14e3674220b0158bfb9a3c7128ac9d2a1ed1b3724d3", [:mix], [], "hexpm"}, - "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, + "telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"}, } From c467bba50f96df4a37cce561252efbac9b0d351a Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Wed, 13 Oct 2021 15:18:00 +0200 Subject: [PATCH 5/8] Test with ES 21.6.0 --- .github/workflows/test.yml | 58 ++++++++++++++++++++++++++++++++++---- .gitignore | 2 ++ docker-compose.yml | 21 ++++++++++++++ test/extreme_test.exs | 1 + 4 files changed, 77 insertions(+), 5 deletions(-) create mode 100644 docker-compose.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 13d39ea..6a0cd0d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,17 +11,17 @@ env: ES_VERSION: 4 jobs: - graph_conn: - name: Build and test + ES-5: + name: ES 5.0.11 runs-on: ubuntu-latest strategy: matrix: - elixir: ['1.11.4', '1.12.2'] - erlang: ['22.3', '23.3', '24.0'] + elixir: ['1.11.4', '1.12.3'] + erlang: ['22.3', '23.3', '24.1'] services: es: - image: eventstore/eventstore:release-5.0.10 + image: eventstore/eventstore:release-5.0.11 ports: ['1113:1113'] env: EVENTSTORE_RUN_PROJECTIONS: "All" @@ -56,3 +56,51 @@ jobs: run: mix compile --warnings-as-errors --force - name: Run tests run: mix test --cover + + ES-21: + name: ES 21.6.0 + runs-on: ubuntu-latest + strategy: + matrix: + elixir: ['1.12.3'] + erlang: ['24.1'] + + services: + es: + image: eventstore/eventstore:21.6.0-buster-slim + ports: ['1113:1113'] + env: + EVENTSTORE_RUN_PROJECTIONS: "All" + EVENTSTORE_START_STANDARD_PROJECTIONS: "true" + EVENTSTORE_CLUSTER_SIZE: 1 + EVENTSTORE_EXT_TCP_PORT: 1113 + EVENTSTORE_INSECURE: "true" + EVENTSTORE_ENABLE_EXTERNAL_TCP: "true" + + steps: + - uses: actions/checkout@v2 + - name: Set up Elixir + uses: erlef/setup-beam@v1 + with: + elixir-version: ${{ matrix.elixir }} + otp-version: ${{ matrix.erlang }} + - name: Restore dependencies cache + uses: actions/cache@v2 + with: + path: deps + key: ${{ runner.os }}-deps-${{ matrix.elixir }}-${{ matrix.erlang }}-${{ hashFiles('mix.lock') }} + restore-keys: ${{ runner.os }}-deps-${{ matrix.elixir }}-${{ matrix.erlang }}- + - name: Install dependencies + run: mix deps.get + - name: Restore builds cache + uses: actions/cache@v2 + with: + path: _build/test + key: ${{ runner.os }}-build_test-${{ matrix.elixir }}-${{ matrix.erlang }}-${{ hashFiles('mix.lock') }} + restore-keys: ${{ runner.os }}-build_test-${{ matrix.elixir }}-${{ matrix.erlang }}- + - name: Check warnings + run: mix compile --warnings-as-errors --force + - name: Check formatting + run: mix format --check-formatted + - name: Run tests + run: mix test --cover --exclude=authentication diff --git a/.gitignore b/.gitignore index e43edc7..f43196c 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,5 @@ extra-*.tar # asdf settings .tool-versions* + +/volumes/ diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..938e28d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,21 @@ +version: '3.4' + +services: + event_store: + #image: eventstore/eventstore:release-5.0.11 + image: eventstore/eventstore:21.6.0-buster-slim + restart: always + environment: + EVENTSTORE_RUN_PROJECTIONS: "All" + EVENTSTORE_START_STANDARD_PROJECTIONS: "true" + EVENTSTORE_CLUSTER_SIZE: 1 + EVENTSTORE_EXT_TCP_PORT: 1113 + EVENTSTORE_INSECURE: "true" + EVENTSTORE_ENABLE_EXTERNAL_TCP: "true" + EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP: "true" + ports: + - 1113:1113 + - 2113:2113 + volumes: + - ./volumes/eventstore/data:/var/lib/eventstore + - ./volumes/eventstore/log:/var/log/eventstore diff --git a/test/extreme_test.exs b/test/extreme_test.exs index 5572c35..cacbd38 100644 --- a/test/extreme_test.exs +++ b/test/extreme_test.exs @@ -41,6 +41,7 @@ defmodule ExtremeTest do describe "Authentication" do defmodule(ForbiddenConn, do: use(Extreme)) + @tag :authentication test ".execute/1 is not authenticated for wrong credentials" do {:ok, _} = :extreme From 924cf2b83cb2eae1251083c3ee4b9cb22b205364 Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Wed, 11 Oct 2023 18:07:20 +0100 Subject: [PATCH 6/8] Remove warnings in elixir 1.15 / otp-26 --- config/config.exs | 2 +- config/dev.exs | 2 +- config/prod.exs | 2 +- config/test.exs | 2 +- lib/extreme/cluster_connection.ex | 2 +- lib/extreme/connection.ex | 2 +- lib/extreme/reading_subscription.ex | 2 +- lib/extreme/request_manager.ex | 2 +- lib/extreme/tcp.ex | 2 +- lib/fanout_listener.ex | 14 +++++++------- lib/listener.ex | 2 +- mix.exs | 4 ++-- mix.lock | 20 ++++++++++---------- test/extreme/tcp_test.exs | 2 +- 14 files changed, 30 insertions(+), 30 deletions(-) diff --git a/config/config.exs b/config/config.exs index 8233fe9..871a3d1 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,3 +1,3 @@ -use Mix.Config +import Config import_config "#{Mix.env()}.exs" diff --git a/config/dev.exs b/config/dev.exs index 16d2412..dc1e72d 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -1,4 +1,4 @@ -use Mix.Config +import Config config :logger, :console, level: :debug, diff --git a/config/prod.exs b/config/prod.exs index d2d855e..becde76 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -1 +1 @@ -use Mix.Config +import Config diff --git a/config/test.exs b/config/test.exs index 5e45d5d..ea059db 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,4 +1,4 @@ -use Mix.Config +import Config config :logger, :console, format: "$time $metadata[$level] $message\n", diff --git a/lib/extreme/cluster_connection.ex b/lib/extreme/cluster_connection.ex index d612ba4..55c75cf 100644 --- a/lib/extreme/cluster_connection.ex +++ b/lib/extreme/cluster_connection.ex @@ -63,5 +63,5 @@ defmodule Extreme.ClusterConnection do defp _rank_state("Manager", _), do: 0 defp _rank_state("ShuttingDown", _), do: 0 defp _rank_state("Shutdown", _), do: 0 - defp _rank_state(state, _), do: Logger.warn("Unrecognized node state: #{state}") + defp _rank_state(state, _), do: Logger.warning("Unrecognized node state: #{state}") end diff --git a/lib/extreme/connection.ex b/lib/extreme/connection.ex index af65dcd..8ac5e1d 100644 --- a/lib/extreme/connection.ex +++ b/lib/extreme/connection.ex @@ -68,7 +68,7 @@ defmodule Extreme.Connection do @impl true def terminate(reason, state) do - Logger.warn("[Extreme] Connection terminated: #{inspect(reason)}") + Logger.warning("[Extreme] Connection terminated: #{inspect(reason)}") RequestManager.kill_all_subscriptions(state.base_name) end diff --git a/lib/extreme/reading_subscription.ex b/lib/extreme/reading_subscription.ex index c70d2e2..994c1eb 100644 --- a/lib/extreme/reading_subscription.ex +++ b/lib/extreme/reading_subscription.ex @@ -109,7 +109,7 @@ defmodule Extreme.ReadingSubscription do end defp _process_read_response({:error, :no_stream, _}, state) do - Logger.warn(fn -> "Stream doesn't exist yet" end) + Logger.warning(fn -> "Stream doesn't exist yet" end) {:extreme, :warn, :stream_soft_deleted, state.read_params.stream} |> _caught_up(state) diff --git a/lib/extreme/request_manager.ex b/lib/extreme/request_manager.ex index d31fe90..a7e9cc5 100644 --- a/lib/extreme/request_manager.ex +++ b/lib/extreme/request_manager.ex @@ -287,7 +287,7 @@ defmodule Extreme.RequestManager do end def handle_cast(:kill_all_subscriptions, %State{} = state) do - Logger.warn("[Extreme] Killing all subscriptions") + Logger.warning("[Extreme] Killing all subscriptions") state.base_name |> Extreme.SubscriptionsSupervisor.kill_all_subscriptions() diff --git a/lib/extreme/tcp.ex b/lib/extreme/tcp.ex index 10855ca..d6cb254 100644 --- a/lib/extreme/tcp.ex +++ b/lib/extreme/tcp.ex @@ -42,7 +42,7 @@ defmodule Extreme.Tcp do {:ok, socket} reason -> - Logger.warn(fn -> "Error connecting to EventStore: #{inspect(reason)}" end) + Logger.warning(fn -> "Error connecting to EventStore: #{inspect(reason)}" end) connect(host, port, configuration, attempt + 1) end end diff --git a/lib/fanout_listener.ex b/lib/fanout_listener.ex index 1d2467c..ae8c29a 100644 --- a/lib/fanout_listener.ex +++ b/lib/fanout_listener.ex @@ -12,7 +12,7 @@ defmodule Extreme.FanoutListener do defmodule MyApp.MyFanoutListener do use Extreme.FanoutListener import MyApp.MyPusher - + defp process_push(push) do Logger.info "Forward to web socket event #{inspect push.event.event_type}" :ok = push.event.data @@ -20,7 +20,7 @@ defmodule Extreme.FanoutListener do |> process_event(push.event.event_type) end end - + defmodule MyApp.MyPusher do def process_event(data, "Elixir.MyApp.Events.PersonCreated") do Logger.debug "Transform and push event with data: #{inspect data}" @@ -33,14 +33,14 @@ defmodule Extreme.FanoutListener do defmodule MyApp.Supervisor do use Supervisor - + def start_link, do: Supervisor.start_link __MODULE__, :ok - + @event_store MyApp.EventStore - + def init(:ok) do event_store_settings = Application.get_env :my_app, :event_store - + children = [ supervisr(MyExtreme, [event_store_settings]), worker(MyApp.MyFanoutListener, [MyExtreme, "my_indexed_stream", [name: MyFanoutListener]]), @@ -112,7 +112,7 @@ defmodule Extreme.FanoutListener do @impl true def handle_info({:DOWN, ref, :process, _pid, _reason}, %{subscription_ref: ref} = state) do reconnect_delay = 1_000 - Logger.warn("Subscription to EventStore is down. Will retry in #{reconnect_delay} ms.") + Logger.warning("Subscription to EventStore is down. Will retry in #{reconnect_delay} ms.") :timer.sleep(reconnect_delay) GenServer.cast(self(), :subscribe) {:noreply, state} diff --git a/lib/listener.ex b/lib/listener.ex index 4ea1b35..f8f88d3 100644 --- a/lib/listener.ex +++ b/lib/listener.ex @@ -109,7 +109,7 @@ defmodule Extreme.Listener do def handle_info({:DOWN, ref, :process, _pid, _reason}, %{subscription_ref: ref} = state) do reconnect_delay = 1_000 - Logger.warn("Subscription to EventStore is down. Will retry in #{reconnect_delay} ms.") + Logger.warning("Subscription to EventStore is down. Will retry in #{reconnect_delay} ms.") :timer.sleep(reconnect_delay) GenServer.cast(self(), :subscribe) {:noreply, state} diff --git a/mix.exs b/mix.exs index 97a9435..49d58e0 100644 --- a/mix.exs +++ b/mix.exs @@ -4,8 +4,8 @@ defmodule Extreme.Mixfile do def project do [ app: :extreme, - version: "1.0.1", - elixir: "~> 1.7", + version: "1.0.2", + elixir: "~> 1.10", elixirc_paths: _elixirc_paths(Mix.env()), source_url: "https://github.com/exponentially/extreme", description: """ diff --git a/mix.lock b/mix.lock index 58a696a..e69aef9 100644 --- a/mix.lock +++ b/mix.lock @@ -1,20 +1,20 @@ %{ "earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.37", "2ad73550e27c8946648b06905a57e4d454e4d7229c2dafa72a0348c99d8be5f7", [:mix], [], "hexpm", "6b19783f2802f039806f375610faa22da130b8edc21209d0bff47918bb48360e"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "ex_doc": {:hex, :ex_doc, "0.25.1", "4b736fa38dc76488a937e5ef2944f5474f3eff921de771b25371345a8dc810bc", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3200b0a69ddb2028365281fbef3753ea9e728683863d8cdaa96580925c891f67"}, + "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "exactor": {:hex, :exactor, "2.2.4", "5efb4ddeb2c48d9a1d7c9b465a6fffdd82300eb9618ece5d34c3334d5d7245b1", [:mix], [], "hexpm", "1222419f706e01bfa1095aec9acf6421367dcfab798a6f67c54cf784733cd6b5"}, "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "32e95820a97cffea67830e91514a2ad53b888850442d6d395f53a1ac60c82e07"}, "exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"}, - "exvcr": {:hex, :exvcr, "0.13.2", "e17fd3ee3a341f41a3aa65a3ce73a339759a9d0658f83782492c6e9b6cf9daa4", [:mix], [{:exactor, "~> 2.2", [hex: :exactor, repo: "hexpm", optional: false]}, {:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:finch, "~> 0.8.0", [hex: :finch, repo: "hexpm", optional: true]}, {:httpoison, "~> 1.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 0.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "17f41a533d14f582fe6b5f83214f058cf5ba77c6a7bc15bc53a9ea1827d92d96"}, - "gpb": {:hex, :gpb, "4.18.0", "305548ad991583f4b9809e905d6a17475ab8df85116cb3c1269fda1e4424c7ea", [:make, :rebar3], [], "hexpm", "c2bb843866e627e1e0181c2f5e4b3ee79cbb8cd3574f66767e7f1d5130d6b025"}, - "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, + "exvcr": {:hex, :exvcr, "0.14.4", "1aa5fe7d3f10b117251c158f8d28b39f7fc73d0a7628b2d0b75bf8cfb1111576", [:mix], [{:exactor, "~> 2.2", [hex: :exactor, repo: "hexpm", optional: false]}, {:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:finch, "~> 0.16", [hex: :finch, repo: "hexpm", optional: true]}, {:httpoison, "~> 1.0 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 0.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "4e600568c02ed29d46bc2e2c74927d172ba06658aa8b14705c0207363c44cc94"}, + "gpb": {:hex, :gpb, "4.20.0", "3617fb7762801c1bed0460ed496d5bab0f4d76e26b923a9ad9abf026c2220e8a", [:make, :rebar3], [], "hexpm", "4268756a98674d5159eeaa0fd4501b7e43b2dd7f10a1f052499e9b09f107d40b"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm", "fc3499fed7a726995aa659143a248534adc754ebd16ccd437cd93b649a95091f"}, - "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, + "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "protobuf": {:hex, :protobuf, "0.5.4", "2e1b8eec211aff034ad8a14e3674220b0158bfb9a3c7128ac9d2a1ed1b3724d3", [:mix], [], "hexpm"}, - "telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, } diff --git a/test/extreme/tcp_test.exs b/test/extreme/tcp_test.exs index d866194..18b9b88 100644 --- a/test/extreme/tcp_test.exs +++ b/test/extreme/tcp_test.exs @@ -2,7 +2,7 @@ defmodule Extreme.TcpTest do use ExUnit.Case, async: true alias Extreme.{Tcp, Configuration} - @test_configuration Application.get_env(:extreme, TestConn) + @test_configuration Application.compile_env(:extreme, TestConn) describe "connect/3" do test "returns {:ok, socket} for correct host and port" do From 328af2fa6b09642a655fed5a624a6ff1c7744f33 Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Thu, 19 Oct 2023 19:27:35 +0100 Subject: [PATCH 7/8] [Listener] Add subscribe/unsubscribe and auto_subscribe? --- CHANGELOG.md | 117 ++++++++++++++--------- lib/extreme/cluster_connection.ex | 2 +- lib/listener.ex | 29 ++++-- mix.exs | 4 +- test/extreme/cluster_connection_test.exs | 2 +- test/extreme/tcp_test.exs | 2 +- test/extreme/tools_test.exs | 2 +- test/listener_test.exs | 75 +++++++++++++++ 8 files changed, 174 insertions(+), 59 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db6f557..3d4e53a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,95 +1,118 @@ +# Changelog for extreme v1.0.3 + +- Add subscribe/unsubscribe and auto_subscribe? option for starting `Extreme.Listener` + # Changelog for extreme v1.0.0-beta2 - * Restart all subscriptions and subscribers/listeners when connection receives :tcp_closed + +- Restart all subscriptions and subscribers/listeners when connection receives :tcp_closed # Changelog for extreme v0.13.1 - * Dependency version upgrades + +- Dependency version upgrades # Changelog for extreme v0.13.0 - * Support Elixir 1.7.0 and OTP 21.0 - * Listener reads events in chunks of 500 events (instead of 4096) - + +- Support Elixir 1.7.0 and OTP 21.0 +- Listener reads events in chunks of 500 events (instead of 4096) + # Changelog for extreme v0.11.0 - * Added support for EventStore 4 - + +- Added support for EventStore 4 + # Changelog for extreme v0.10.4 - * Fixed issue with concurrent read and write where messages get stuck into extreme process state - + +- Fixed issue with concurrent read and write where messages get stuck into extreme process state + # Changelog for extreme v0.10.3 - * Extreme.Listener - if get_last_event/1 returns `:from_now`, catching events will start from current event + +- Extreme.Listener - if get_last_event/1 returns `:from_now`, catching events will start from current event # Changelog for Extreme v0.10.2 - * Fix end of patching in Listener + +- Fix end of patching in Listener # Changelog for Extreme v0.10.1 - * Dependecy upgrades - * Tested with Elixir 1.5.2 and OTP 20.1 + +- Dependecy upgrades +- Tested with Elixir 1.5.2 and OTP 20.1 # Changelog for Extreme v0.9.2 - * Added support for nacking messages from persistent connections (thanks to [@nathanfox](https://github.com/nathanfox)) + +- Added support for nacking messages from persistent connections (thanks to [@nathanfox](https://github.com/nathanfox)) # Changelog for Extreme v0.9.1 - * Support persistent subscriptions on projection streams (e.g. `$ce-category`) + +- Support persistent subscriptions on projection streams (e.g. `$ce-category`) # Changelog for Extreme v0.9.0 - * Added support for persistent connections (thanks to [@slashdotdash](https://github.com/slashdotdash)) - * BREAKING CHANGE: Module `Extreme.Messages` is renamed to `Extreme.Msg` + +- Added support for persistent connections (thanks to [@slashdotdash](https://github.com/slashdotdash)) +- BREAKING CHANGE: Module `Extreme.Messages` is renamed to `Extreme.Msg` # Changelog for Extreme v0.8.1 - * Added pause, resume and patch functionalities for Extreme.Listener + +- Added pause, resume and patch functionalities for Extreme.Listener # Changelog for Extreme v0.8.0 - * Tested with Elixir 1.4.0 with fixed warnings - * Listener won't crash if ES is down. It will try to reconnect each 1sec instead of immediately - * Extreme.Listener.caught_up/0 callback is public now - * Bumped up all dependency versions + +- Tested with Elixir 1.4.0 with fixed warnings +- Listener won't crash if ES is down. It will try to reconnect each 1sec instead of immediately +- Extreme.Listener.caught_up/0 callback is public now +- Bumped up all dependency versions # Changelog for Extreme v0.7.1 - * When connecting to ES cluster choose mode :write (default) to prefer Master over Slave or :read for opposite + +- When connecting to ES cluster choose mode :write (default) to prefer Master over Slave or :read for opposite # Changelog for Extreme v0.7.0 - * When read_and_stay_subscribed/7 function is called, :caught_up message is sent to subscriber after existing events - are read (or if there were no events) and before new events arrive. This is sign to your listener that you are - up-to-date. If you don't have catch all handle_info/2 in your receiver this is breaking change! + +- When read_and_stay_subscribed/7 function is called, :caught_up message is sent to subscriber after existing events + are read (or if there were no events) and before new events arrive. This is sign to your listener that you are + up-to-date. If you don't have catch all handle_info/2 in your receiver this is breaking change! # Changelog for Extreme v0.6.2 - * Added Extreme.FanoutListener - * Added inline documentation + +- Added Extreme.FanoutListener +- Added inline documentation # Changelog for Extreme v0.6.1 - * Removed PersistentSubscription related proto messages since when compiled - they generate files longer then 100 characters and as such release can't be built + +- Removed PersistentSubscription related proto messages since when compiled + they generate files longer then 100 characters and as such release can't be built # Changelog for Extreme v0.6.0 - * Added Extreme.Listener + +- Added Extreme.Listener # Changelog for Extreme v0.5.5 - * Removed PersistentSubscription related proto messages since when compiled - they generate files longer then 100 characters and as such release can't be built + +- Removed PersistentSubscription related proto messages since when compiled + they generate files longer then 100 characters and as such release can't be built # Changelog for Extreme v0.5.4 - * Read events backward (see example in test file) - * Some code cleanup and proto file updated (thanks to @mindreframer) - * Tested with Elixir 1.3.2 and EventStore 3.9.0 + +- Read events backward (see example in test file) +- Some code cleanup and proto file updated (thanks to @mindreframer) +- Tested with Elixir 1.3.2 and EventStore 3.9.0 # Changelog for Extreme v0.5.3 - * Tested with Elixir 1.3.2 and EventStore 3.6.2 +- Tested with Elixir 1.3.2 and EventStore 3.6.2 # Changelog for Extreme v0.5.2 - * Minor improvements - * Upgraded depencey versions - * Tested with Elixir 1.3.0 and EventStore 3.6.2 +- Minor improvements +- Upgraded depencey versions +- Tested with Elixir 1.3.0 and EventStore 3.6.2 # Changelog for Extreme v0.5.1 - * Stop Extreme process when tcp is closed by EventStore - * Adding Dns cluster connection support for configuration - * Tested with Elixir 1.2.5 and EventStore 3.5.0 - +- Stop Extreme process when tcp is closed by EventStore +- Adding Dns cluster connection support for configuration +- Tested with Elixir 1.2.5 and EventStore 3.5.0 # Changelog for Extreme v0.5.0 - * You can subscribe to non-existing stream now with subscribe_to/4 and read_and_stay_subscribed/7 functions. If you do such thing however, you'll be sent message {:extreme, severity, problem, stream}. If you don't have catch all handle_info/2 in your receiver this is breaking change. - * More tests added - * Tested with Elixir 1.2.3 and EventStore 3.4.0 +- You can subscribe to non-existing stream now with subscribe_to/4 and read_and_stay_subscribed/7 functions. If you do such thing however, you'll be sent message {:extreme, severity, problem, stream}. If you don't have catch all handle_info/2 in your receiver this is breaking change. +- More tests added +- Tested with Elixir 1.2.3 and EventStore 3.4.0 diff --git a/lib/extreme/cluster_connection.ex b/lib/extreme/cluster_connection.ex index 55c75cf..d6b91f8 100644 --- a/lib/extreme/cluster_connection.ex +++ b/lib/extreme/cluster_connection.ex @@ -10,7 +10,7 @@ defmodule Extreme.ClusterConnection do def gossip_with([], _, _), do: {:error, :no_more_gossip_seeds} def gossip_with([node | rest_nodes], gossip_timeout, mode) do - url = 'http://#{node.host}:#{node.port}/gossip?format=json' + url = ~c"http://#{node.host}:#{node.port}/gossip?format=json" Logger.info("Gossip with #{url}") case :httpc.request(:get, {url, []}, [timeout: gossip_timeout], []) do diff --git a/lib/listener.ex b/lib/listener.ex index f8f88d3..edc00f6 100644 --- a/lib/listener.ex +++ b/lib/listener.ex @@ -31,20 +31,24 @@ defmodule Extreme.Listener do {resolve_link_tos, opts} = Keyword.pop(opts, :resolve_link_tos, true) {require_master, opts} = Keyword.pop(opts, :require_master, false) {ack_timeout, opts} = Keyword.pop(opts, :ack_timeout, 5_000) + {auto_subscribe?, opts} = Keyword.pop(opts, :auto_subscribe?, true) opts = Keyword.put_new(opts, :name, __MODULE__) GenServer.start_link( __MODULE__, - {extreme, stream_name, read_per_page, resolve_link_tos, require_master, ack_timeout}, + {extreme, stream_name, read_per_page, resolve_link_tos, require_master, ack_timeout, + auto_subscribe?}, opts ) end - def unsubscribe(server), do: GenServer.call(server, :unsubscribe) + def unsubscribe(server \\ __MODULE__), do: GenServer.call(server, :unsubscribe) + def subscribe(server \\ __MODULE__), do: GenServer.cast(server, :subscribe) @impl true def init( - {extreme, stream_name, read_per_page, resolve_link_tos, require_master, ack_timeout} + {extreme, stream_name, read_per_page, resolve_link_tos, require_master, ack_timeout, + auto_subscribe?} ) do state = %{ extreme: extreme, @@ -59,7 +63,11 @@ defmodule Extreme.Listener do ack_timeout: ack_timeout } - GenServer.cast(self(), :subscribe) + :ok = on_init(state) + + if auto_subscribe?, + do: GenServer.cast(self(), :subscribe) + {:ok, state} end @@ -75,6 +83,10 @@ defmodule Extreme.Listener do end def handle_call(:unsubscribe, from, state) do + Logger.info( + "#{__MODULE__} unsubscribed from #{state.stream_name}. Last processed event: #{state.last_event}" + ) + true = Process.demonitor(state.subscription_ref) :unsubscribed = state.extreme.unsubscribe(state.subscription) {:reply, :ok, %{state | subscription: nil, subscription_ref: nil}} @@ -138,7 +150,7 @@ defmodule Extreme.Listener do ref = Process.monitor(subscription) Logger.info(fn -> - "Listener subscribed to stream #{state.stream_name}. Start processing live events from event no: #{last_event + 1}" + "#{__MODULE__} subscribed to stream #{state.stream_name}. Start processing live events from event no: #{last_event + 1}" end) {:ok, @@ -162,11 +174,16 @@ defmodule Extreme.Listener do end def caught_up, do: Logger.debug(fn -> "We are up to date" end) + def on_init(_), do: Logger.info(fn -> "#{__MODULE__} started" end) def register_patching_start(_, _, _), do: {:error, :not_implemented} def patching_done(_), do: {:error, :not_implemented} def process_patch(_, _), do: {:error, :not_implemented} - defoverridable caught_up: 0, register_patching_start: 3, patching_done: 1, process_patch: 2 + defoverridable caught_up: 0, + on_init: 1, + register_patching_start: 3, + patching_done: 1, + process_patch: 2 end end end diff --git a/mix.exs b/mix.exs index 49d58e0..6ce66a3 100644 --- a/mix.exs +++ b/mix.exs @@ -4,8 +4,8 @@ defmodule Extreme.Mixfile do def project do [ app: :extreme, - version: "1.0.2", - elixir: "~> 1.10", + version: "1.0.3", + elixir: "~> 1.11", elixirc_paths: _elixirc_paths(Mix.env()), source_url: "https://github.com/exponentially/extreme", description: """ diff --git a/test/extreme/cluster_connection_test.exs b/test/extreme/cluster_connection_test.exs index 9a88fae..bef834d 100644 --- a/test/extreme/cluster_connection_test.exs +++ b/test/extreme/cluster_connection_test.exs @@ -7,7 +7,7 @@ defmodule Extreme.ClusterConnectionTest do test "returns master node ip and tcp port when configured for writing" do # vcr cassette file is customized after recording to return master as localhost use_cassette "gossip_with_clusters_existing_node" do - assert {:ok, 'localhost', 1113} = + assert {:ok, ~c"localhost", 1113} = Extreme.ClusterConnection.gossip_with( [%{host: "0.0.0.0", port: "2113"}], 20_000, diff --git a/test/extreme/tcp_test.exs b/test/extreme/tcp_test.exs index 18b9b88..c1fbac5 100644 --- a/test/extreme/tcp_test.exs +++ b/test/extreme/tcp_test.exs @@ -12,7 +12,7 @@ defmodule Extreme.TcpTest do end test "returns {:error, :max_attempt_exceeded} for incorrect port when `max_attempts` exceeds" do - host = 'localhost' + host = ~c"localhost" port = 1609 assert {:error, :max_attempt_exceeded} = Tcp.connect(host, port, max_attempts: 1) diff --git a/test/extreme/tools_test.exs b/test/extreme/tools_test.exs index 7d0c0f8..38904d5 100644 --- a/test/extreme/tools_test.exs +++ b/test/extreme/tools_test.exs @@ -19,7 +19,7 @@ defmodule Extreme.ToolsTest do assert cast_to_atom(1) == 1 assert cast_to_atom(2.0) == 2.0 assert cast_to_atom(true) == true - assert cast_to_atom('hello world') == 'hello world' + assert cast_to_atom(~c"hello world") == ~c"hello world" end end end diff --git a/test/listener_test.exs b/test/listener_test.exs index afcad91..749f304 100644 --- a/test/listener_test.exs +++ b/test/listener_test.exs @@ -117,4 +117,79 @@ defmodule Extreme.ListenerTest do :ok = MyListener.unsubscribe(listener) Helpers.assert_no_leaks(TestConn) end + + test "subscribe/unsubscribe" do + stream = Helpers.random_stream_name() + event1 = %Event.PersonCreated{name: "Pera"} + event2 = %Event.PersonChangedName{name: "Zika"} + event3 = %Event.PersonChangedName{name: "Laza"} + + assert DB.get_last_event(MyListener, stream) == -1 + + # write 2 events to stream + {:ok, %ExMsg.WriteEventsCompleted{}} = + TestConn.execute(Helpers.write_events(stream, [event1, event2])) + + # run listener and expect it to read them + {:ok, _listener} = MyListener.start_link(TestConn, stream, read_per_page: 2) + assert_receive {:processing_push, event_type, event} + assert event_type == "Elixir.ExtremeTest.Events.PersonCreated" + assert event1 == :erlang.binary_to_term(event) + assert_receive {:processing_push, event_type, event} + assert event_type == "Elixir.ExtremeTest.Events.PersonChangedName" + assert event2 == :erlang.binary_to_term(event) + assert DB.get_last_event(MyListener, stream) == 1 + + # unsubscribe and write new event + :ok = MyListener.unsubscribe() + + {:ok, %ExMsg.WriteEventsCompleted{}} = + TestConn.execute(Helpers.write_events(stream, [event3])) + + # new event is not processed + refute_receive {:processing_push, _event_type, _event} + + # when subscribed again, listener resumes where it stopped + :ok = MyListener.subscribe() + + assert_receive {:processing_push, event_type, event} + assert event_type == "Elixir.ExtremeTest.Events.PersonChangedName" + assert event3 == :erlang.binary_to_term(event) + assert DB.get_last_event(MyListener, stream) == 2 + + refute_receive {:processing_push, _event_type, _event} + + :ok = MyListener.unsubscribe() + Helpers.assert_no_leaks(TestConn) + end + + test "auto_subscribe? can be turned off" do + stream = Helpers.random_stream_name() + event1 = %Event.PersonCreated{name: "Pera"} + event2 = %Event.PersonChangedName{name: "Zika"} + + assert DB.get_last_event(MyListener, stream) == -1 + + {:ok, %ExMsg.WriteEventsCompleted{}} = + TestConn.execute(Helpers.write_events(stream, [event1, event2])) + + # run listener and expect they are not processed + {:ok, _listener} = + MyListener.start_link(TestConn, stream, read_per_page: 2, auto_subscribe?: false) + + refute_receive {:processing_push, _event_type, _event}, 1_000 + + # turn on subscription and expect that events are processed + :ok = MyListener.subscribe() + assert_receive {:processing_push, event_type, event} + assert event_type == "Elixir.ExtremeTest.Events.PersonCreated" + assert event1 == :erlang.binary_to_term(event) + assert_receive {:processing_push, event_type, event} + assert event_type == "Elixir.ExtremeTest.Events.PersonChangedName" + assert event2 == :erlang.binary_to_term(event) + assert DB.get_last_event(MyListener, stream) == 1 + + :ok = MyListener.unsubscribe() + Helpers.assert_no_leaks(TestConn) + end end From 5e42ff9c3d5a186413e86ed5d4a439293b81091a Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Fri, 20 Oct 2023 10:28:58 +0100 Subject: [PATCH 8/8] Allow Listener.process_push/2 to return 'stop' --- CHANGELOG.md | 5 +++++ lib/listener.ex | 22 +++++++++++++++++----- mix.exs | 2 +- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d4e53a..2424bfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# Changelog for extreme v1.0.4 + +- Listener.process_push callback can return `:stop`, meaning subscription should be stopped + and pushes that are already in mailbox should be purged. + # Changelog for extreme v1.0.3 - Add subscribe/unsubscribe and auto_subscribe? option for starting `Extreme.Listener` diff --git a/lib/listener.ex b/lib/listener.ex index edc00f6..20f9d70 100644 --- a/lib/listener.ex +++ b/lib/listener.ex @@ -72,17 +72,29 @@ defmodule Extreme.Listener do end @impl true + def handle_call({:on_event, _push}, _from, %{subscription: nil, mode: :live} = state), + do: {:reply, :ok, state} + def handle_call( {:on_event, push}, _from, %{subscription: subscription, mode: :live} = state - ) - when not is_nil(subscription) do - {:ok, event_number} = process_push(push, state.stream_name) - {:reply, :ok, %{state | last_event: event_number}} + ) do + push + |> process_push(state.stream_name) + |> case do + {:ok, event_number} -> + {:reply, :ok, %{state | last_event: event_number}} + + :stop -> + _unsubscribe(state) + end end - def handle_call(:unsubscribe, from, state) do + def handle_call(:unsubscribe, _from, state), + do: _unsubscribe(state) + + defp _unsubscribe(state) do Logger.info( "#{__MODULE__} unsubscribed from #{state.stream_name}. Last processed event: #{state.last_event}" ) diff --git a/mix.exs b/mix.exs index 6ce66a3..b1b9853 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Extreme.Mixfile do def project do [ app: :extreme, - version: "1.0.3", + version: "1.0.4", elixir: "~> 1.11", elixirc_paths: _elixirc_paths(Mix.env()), source_url: "https://github.com/exponentially/extreme",