Skip to content

Commit

Permalink
rename private function names
Browse files Browse the repository at this point in the history
  • Loading branch information
burmajam committed Jun 18, 2024
1 parent f9b4371 commit db0fd8e
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions lib/kelvin/in_order_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ defmodule Kelvin.InOrderSubscription do
def handle_info(:check_auto_subscribe, state) do
identifier = "#{inspect(__MODULE__)} (#{inspect(state.self)})"

if do_function(state.config.subscribe_on_init?) do
if _do_function(state.config.subscribe_on_init?) do
Logger.info("#{identifier} subscribing to '#{state.config.stream_name}'")

GenStage.async_info(self(), :subscribe)
Expand All @@ -97,7 +97,7 @@ defmodule Kelvin.InOrderSubscription do
Logger.warn("#{inspect(__MODULE__)} is already subscribed.")
# coveralls-ignore-stop
else
case subscribe(state) do
case _subscribe(state) do
{:ok, sub} ->
Process.link(sub)
{:noreply, [], put_in(state.subscription, sub)}
Expand All @@ -119,10 +119,10 @@ defmodule Kelvin.InOrderSubscription do
case state do
%{demand: 0, buffer_size: size, max_buffer_size: max}
when size + 1 == max ->
{:noreply, [], enqueue(state, {event, from})}
{:noreply, [], _enqueue(state, {event, from})}

%{demand: 0} ->
{:reply, :ok, [], enqueue(state, event)}
{:reply, :ok, [], _enqueue(state, event)}

%{demand: demand} ->
{:reply, :ok, [{state.self, event}], put_in(state.demand, demand - 1)}
Expand All @@ -131,26 +131,26 @@ defmodule Kelvin.InOrderSubscription do

@impl GenStage
def handle_demand(demand, state) do
dequeue_events(state, demand, [])
_dequeue_events(state, demand, [])
end

defp dequeue_events(%{buffer_size: size} = state, demand, events)
defp _dequeue_events(%{buffer_size: size} = state, demand, events)
when size == 0 or demand == 0 do
{:noreply, :lists.reverse(events), put_in(state.demand, demand)}
end

defp dequeue_events(state, demand, events) do
case dequeue(state) do
defp _dequeue_events(state, demand, events) do
case _dequeue(state) do
{{:value, {event, from}}, state} ->
GenStage.reply(from, :ok)
dequeue_events(state, demand - 1, [{state.self, event} | events])
_dequeue_events(state, demand - 1, [{state.self, event} | events])

{{:value, event}, state} ->
dequeue_events(state, demand - 1, [{state.self, event} | events])
_dequeue_events(state, demand - 1, [{state.self, event} | events])
end
end

defp dequeue(state) do
defp _dequeue(state) do
case :queue.out(state.buffer) do
# coveralls-ignore-start
{:empty, buffer} ->
Expand All @@ -162,25 +162,25 @@ defmodule Kelvin.InOrderSubscription do
end
end

defp subscribe(state) do
defp _subscribe(state) do
state.config.connection
|> Extreme.RequestManager._name()
|> GenServer.call(
{:read_and_stay_subscribed, self(),
{state.config.stream_name,
do_function(state.config.restore_stream_position!) + 1,
_do_function(state.config.restore_stream_position!) + 1,
state.max_buffer_size, true, false, :infinity}},
:infinity
)
end

defp do_function(func) when is_function(func, 0), do: func.()
defp _do_function(func) when is_function(func, 0), do: func.()

defp do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do
defp _do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do
apply(m, f, a)
end

defp enqueue(state, element) do
defp _enqueue(state, element) do
%{
state
| buffer: :queue.in(element, state.buffer),
Expand Down

0 comments on commit db0fd8e

Please sign in to comment.