Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to use different sessions in Examples.SessionTest #14

Merged
merged 14 commits into from
Feb 16, 2024
Merged
33 changes: 29 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ env:

on:
push:
branches:
- "pojiro/0.2.0-dev"
branches: main
pull_request:

jobs:
build-deps:
Expand Down Expand Up @@ -71,7 +71,7 @@ jobs:
run: cargo fmt --all -- --check

- name: credo
run: mix credo --all
run: mix credo --ignore fixme

- name: restore plts cache
id: restore-plts-cache
Expand All @@ -96,7 +96,7 @@ jobs:
- name: dialyzer
run: mix dialyzer --format github

test:
test-with-one-session:
needs: build-deps
runs-on: ubuntu-latest

Expand All @@ -120,3 +120,28 @@ jobs:

- name: test
run: mix test --warnings-as-errors --cover

test-with-two-session:
needs: build-deps
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- uses: erlef/setup-beam@v1
id: setup-beam
with:
otp-version: ${{env.OTP_VERSION}}
elixir-version: ${{env.ELIXIR_VERSION}}

- uses: actions/cache/restore@v4
id: restore-deps-cache
with:
path: |
deps
_build
key: deps-${{ runner.os }}-${{ steps.setup-beam.outputs.otp-version }}-${{ steps.setup-beam.outputs.elixir-version }}-${{ hashFiles('**/*.lock') }}
restore-keys: deps-${{ runner.os }}-${{ steps.setup-beam.outputs.otp-version }}-${{ steps.setup-beam.outputs.elixir-version }}-

- name: test
run: USE_DIFFERENT_SESSION="" mix test --warnings-as-errors --cover
10 changes: 6 additions & 4 deletions lib/zenohex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ defmodule Zenohex do
"""
@spec open(Config.t()) :: {:ok, Session.t()} | {:error, reason :: any()}
def open(config \\ %Config{}) do
if System.get_env("SCOUTING_DELAY") == "0" do
Nif.zenoh_open(%Config{scouting: %Scouting{delay: 0}})
else
Nif.zenoh_open(config)
case System.get_env("SCOUTING_DELAY") do
nil ->
Nif.zenoh_open(config)

delay ->
Nif.zenoh_open(%Config{config | scouting: %Scouting{delay: String.to_integer(delay)}})
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/zenohex/examples/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule Zenohex.Examples.Publisher do
Start Publisher.
"""
def start_link(args \\ %{}) when is_map(args) do
session = Map.get(args, :session, Zenohex.open!())
session = Map.get(args, :session) || Zenohex.open!()
key_expr = Map.get(args, :key_expr, "zenohex/examples/pub")
Supervisor.start_link(__MODULE__, %{session: session, key_expr: key_expr}, name: __MODULE__)
end
Expand Down
13 changes: 5 additions & 8 deletions lib/zenohex/examples/publisher/impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ defmodule Zenohex.Examples.Publisher.Impl do

use GenServer

alias Zenohex.Session
alias Zenohex.Publisher

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
Expand All @@ -29,27 +26,27 @@ defmodule Zenohex.Examples.Publisher.Impl do
def init(args) do
session = Map.fetch!(args, :session)
key_expr = Map.fetch!(args, :key_expr)
{:ok, publisher} = Session.declare_publisher(session, key_expr)
{:ok, publisher} = Zenohex.Session.declare_publisher(session, key_expr)
{:ok, %{publisher: publisher}}
end

def handle_call({:put, value}, _from, state) do
:ok = Publisher.put(state.publisher, value)
:ok = Zenohex.Publisher.put(state.publisher, value)
{:reply, :ok, state}
end

def handle_call(:delete, _from, state) do
:ok = Publisher.delete(state.publisher)
:ok = Zenohex.Publisher.delete(state.publisher)
{:reply, :ok, state}
end

def handle_call({:congestion_control, value}, _from, state) do
publisher = Publisher.congestion_control(state.publisher, value)
publisher = Zenohex.Publisher.congestion_control(state.publisher, value)
{:reply, :ok, %{state | publisher: publisher}}
end

def handle_call({:priority, value}, _from, state) do
publisher = Publisher.priority(state.publisher, value)
publisher = Zenohex.Publisher.priority(state.publisher, value)
{:reply, :ok, %{state | publisher: publisher}}
end
end
2 changes: 1 addition & 1 deletion lib/zenohex/examples/pull_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ defmodule Zenohex.Examples.PullSubscriber do
Start PullSubscriber.
"""
def start_link(args \\ %{}) when is_map(args) do
session = Map.get(args, :session, Zenohex.open!())
session = Map.get(args, :session) || Zenohex.open!()
key_expr = Map.get(args, :key_expr, "zenohex/examples/**")
callback = Map.get(args, :callback, &Logger.debug(inspect(&1)))

Expand Down
26 changes: 13 additions & 13 deletions lib/zenohex/examples/pull_subscriber/impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ defmodule Zenohex.Examples.PullSubscriber.Impl do

require Logger

alias Zenohex.Session
alias Zenohex.PullSubscriber

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
Expand All @@ -21,16 +18,26 @@ defmodule Zenohex.Examples.PullSubscriber.Impl do
key_expr = Map.fetch!(args, :key_expr)
callback = Map.fetch!(args, :callback)

{:ok, pull_subscriber} = Session.declare_pull_subscriber(session, key_expr)
{:ok, pull_subscriber} = Zenohex.Session.declare_pull_subscriber(session, key_expr)
state = %{pull_subscriber: pull_subscriber, callback: callback}

send(self(), :loop)
recv_timeout(state)

{:ok, state}
end

def handle_info(:loop, state) do
case PullSubscriber.recv_timeout(state.pull_subscriber) do
recv_timeout(state)
{:noreply, state}
end

def handle_call(:pull, _from, state) do
:ok = Zenohex.PullSubscriber.pull(state.pull_subscriber)
{:reply, :ok, state}
end

defp recv_timeout(state) do
case Zenohex.PullSubscriber.recv_timeout(state.pull_subscriber) do
{:ok, sample} ->
state.callback.(sample)
send(self(), :loop)
Expand All @@ -41,12 +48,5 @@ defmodule Zenohex.Examples.PullSubscriber.Impl do
{:error, error} ->
Logger.error(inspect(error))
end

{:noreply, state}
end

def handle_call(:pull, _from, state) do
:ok = PullSubscriber.pull(state.pull_subscriber)
{:reply, :ok, state}
end
end
2 changes: 1 addition & 1 deletion lib/zenohex/examples/queryable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Zenohex.Examples.Queryable do
Start Queryable.
"""
def start_link(args \\ %{}) when is_map(args) do
session = Map.get(args, :session, Zenohex.open!())
session = Map.get(args, :session) || Zenohex.open!()
key_expr = Map.get(args, :key_expr, "zenohex/examples/**")
callback = Map.get(args, :callback, &Logger.debug(inspect(&1)))

Expand Down
17 changes: 9 additions & 8 deletions lib/zenohex/examples/queryable/impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ defmodule Zenohex.Examples.Queryable.Impl do

require Logger

alias Zenohex.Session
alias Zenohex.Queryable

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
Expand All @@ -17,16 +14,22 @@ defmodule Zenohex.Examples.Queryable.Impl do
key_expr = Map.fetch!(args, :key_expr)
callback = Map.fetch!(args, :callback)

{:ok, queryable} = Session.declare_queryable(session, key_expr)
{:ok, queryable} = Zenohex.Session.declare_queryable(session, key_expr)
state = %{queryable: queryable, callback: callback}

send(self(), :loop)
recv_timeout(state)

{:ok, state}
end

def handle_info(:loop, state) do
case Queryable.recv_timeout(state.queryable) do
recv_timeout(state)

{:noreply, state}
end

def recv_timeout(state) do
case Zenohex.Queryable.recv_timeout(state.queryable) do
{:ok, query} ->
state.callback.(query)
send(self(), :loop)
Expand All @@ -37,7 +40,5 @@ defmodule Zenohex.Examples.Queryable.Impl do
{:error, error} ->
Logger.error(inspect(error))
end

{:noreply, state}
end
end
6 changes: 1 addition & 5 deletions lib/zenohex/examples/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,10 @@ defmodule Zenohex.Examples.Session do
Start Session.
"""
def start_link(args \\ %{}) when is_map(args) do
session = Map.get(args, :session, Zenohex.open!())
session = Map.get(args, :session) || Zenohex.open!()
Supervisor.start_link(__MODULE__, %{session: session}, name: __MODULE__)
end

@doc "Get session."
@spec session() :: Zenohex.Session.t()
defdelegate session(), to: Session.Impl

@doc "Put data."
@spec put(String.t(), integer() | float() | binary()) :: :ok
defdelegate put(key_expr, value), to: Session.Impl
Expand Down
8 changes: 0 additions & 8 deletions lib/zenohex/examples/session/impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ defmodule Zenohex.Examples.Session.Impl do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end

def session() do
GenServer.call(__MODULE__, :session)
end

def put(key_expr, value) do
GenServer.call(__MODULE__, {:put, key_expr, value})
end
Expand Down Expand Up @@ -40,10 +36,6 @@ defmodule Zenohex.Examples.Session.Impl do
}}
end

def handle_call(:session, _from, state) do
{:reply, state.session, state}
end

def handle_call({:put, key_expr, value}, _from, state) do
ret = Zenohex.Session.put(state.session, key_expr, value)
{:reply, ret, state}
Expand Down
2 changes: 1 addition & 1 deletion lib/zenohex/examples/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule Zenohex.Examples.Storage do
Start storage.
"""
def start_link(args \\ %{}) when is_map(args) do
session = Map.get(args, :session, Zenohex.open!())
session = Map.get(args, :session) || Zenohex.open!()
key_expr = Map.get(args, :key_expr, "zenohex/examples/**")
Supervisor.start_link(__MODULE__, %{session: session, key_expr: key_expr}, name: __MODULE__)
end
Expand Down
19 changes: 7 additions & 12 deletions lib/zenohex/examples/storage/queryable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,28 @@ defmodule Zenohex.Examples.Storage.Queryable do

require Logger

alias Zenohex.Session
alias Zenohex.Queryable
alias Zenohex.Examples.Storage.Store
alias Zenohex.Query

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end

def init(args) do
{:ok, queryable} = Session.declare_queryable(args.session, args.key_expr)
{:ok, queryable} = Zenohex.Session.declare_queryable(args.session, args.key_expr)
send(self(), :loop)
{:ok, %{queryable: queryable}}
end

def handle_info(:loop, state) do
case Queryable.recv_timeout(state.queryable) do
case Zenohex.Queryable.recv_timeout(state.queryable) do
{:ok, query} ->
case store(query) do
{:error, :not_found} ->
nil

{:ok, samples} ->
Enum.each(samples, &Query.reply(query, &1))
:ok = Query.finish_reply(query)
Enum.each(samples, &Zenohex.Query.reply(query, &1))
:ok = Zenohex.Query.finish_reply(query)
# following line is not needed, this is just example of double call
{:error, "ResponseFinal has already been sent"} = Query.finish_reply(query)
{:error, "ResponseFinal has already been sent"} = Zenohex.Query.finish_reply(query)
end

{:error, :timeout} ->
Expand All @@ -45,7 +40,7 @@ defmodule Zenohex.Examples.Storage.Queryable do
{:noreply, state}
end

defp store(query) when is_struct(query, Query) do
Store.get(query.key_expr)
defp store(query) when is_struct(query, Zenohex.Query) do
Zenohex.Examples.Storage.Store.get(query.key_expr)
end
end
4 changes: 1 addition & 3 deletions lib/zenohex/examples/storage/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ defmodule Zenohex.Examples.Storage.Store do

require Logger

alias Zenohex.KeyExpr

def start_link(initial_state) do
Agent.start_link(fn -> initial_state end, name: __MODULE__)
end
Expand Down Expand Up @@ -49,7 +47,7 @@ defmodule Zenohex.Examples.Storage.Store do
end

defp find_keys(map, key_expr) do
Map.keys(map) |> Enum.filter(&KeyExpr.intersects?(&1, key_expr))
Map.keys(map) |> Enum.filter(&Zenohex.KeyExpr.intersects?(&1, key_expr))
end

defp collect_samples(map, key_expr) do
Expand Down
11 changes: 6 additions & 5 deletions lib/zenohex/examples/storage/store_behaviour.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
defmodule Zenohex.Examples.Storage.StoreBehaviour do
@moduledoc false

alias Zenohex.Sample

@callback put(key_expr :: String.t(), sample :: Sample.t()) :: :ok | {:error, reason :: any()}
@callback delete(key_expr :: String.t()) :: :ok | {:error, reason :: any()}
@callback get(selector :: String.t()) :: {:ok, [Sample.t()]} | {:error, reason :: any()}
@callback put(key_expr :: String.t(), sample :: Zenohex.Sample.t()) ::
:ok | {:error, reason :: any()}
@callback delete(key_expr :: String.t()) ::
:ok | {:error, reason :: any()}
@callback get(selector :: String.t()) ::
{:ok, [Zenohex.Sample.t()]} | {:error, reason :: any()}
end
Loading