Skip to content

Commit

Permalink
Merge branch 'release/v0.10.2' of https://github.com/membraneframewor…
Browse files Browse the repository at this point in the history
…k/membrane_core into release/v0.10.2
  • Loading branch information
varsill committed Jul 26, 2022
2 parents a565ad9 + 711fc2c commit 3136a25
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 26 deletions.
3 changes: 2 additions & 1 deletion lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ defmodule Membrane.Core.Bin.PadController do
:target_queue_size,
:min_demand_factor,
:auto_demand_size,
:toilet_capacity
:toilet_capacity,
:throttling_factor
] do
external_value || internal_value
else
Expand Down
1 change: 1 addition & 0 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ defmodule Membrane.Core.Element do
node: #{node},
module: #{inspect(module)},
element options: #{inspect(user_options)},
method: #{method}
""")

# rpc if necessary
Expand Down
16 changes: 12 additions & 4 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,24 @@ defmodule Membrane.Core.Element.DemandHandler do
PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
end

def handle_outgoing_buffers(pad_ref, %{mode: :push, toilet: toilet} = data, buffers, state)
def handle_outgoing_buffers(
pad_ref,
%{
mode: :push,
toilet: toilet
} = data,
buffers,
state
)
when toilet != nil do
%{other_demand_unit: other_demand_unit} = data
buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers)

case Toilet.fill(toilet, buf_size) do
:ok ->
state
{:ok, toilet} ->
PadModel.set_data!(state, pad_ref, :toilet, toilet)

:overflow ->
{:overflow, _toilet} ->
# if the toilet has overflowed, we remove it so it didn't overflow again
# and let the parent handle that situation by unlinking this output pad or crashing
PadModel.set_data!(state, pad_ref, :toilet, nil)
Expand Down
1 change: 1 addition & 0 deletions lib/membrane/core/element/lifecycle_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ defmodule Membrane.Core.Element.LifecycleController do
@impl PlaybackHandler
def handle_playback_state(old_playback_state, new_playback_state, state) do
require CallbackContext.PlaybackChange

context = &CallbackContext.PlaybackChange.from_state/1
callback = PlaybackHandler.state_change_callback(old_playback_state, new_playback_state)

Expand Down
13 changes: 10 additions & 3 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,16 @@ defmodule Membrane.Core.Element.PadController do
:ok = Child.PadController.validate_pad_being_linked!(endpoint.pad_ref, direction, info, state)

toilet =
if direction == :input,
do: Toilet.new(endpoint.pad_props.toilet_capacity, info.demand_unit, self()),
else: nil
if direction == :input do
Toilet.new(
endpoint.pad_props.toilet_capacity,
info.demand_unit,
self(),
endpoint.pad_props.throttling_factor
)
else
nil
end

do_handle_link(endpoint, other_endpoint, info, toilet, link_props, state)
end
Expand Down
118 changes: 104 additions & 14 deletions lib/membrane/core/element/toilet.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,125 @@ defmodule Membrane.Core.Element.Toilet do

require Membrane.Logger

@opaque t :: {__MODULE__, :atomics.atomics_ref(), pos_integer, Process.dest()}
defmodule DistributedCounter do
@moduledoc false

# A module providing a common interface to access and modify a counter used in the toilet implementation.
# The counter uses :atomics module under the hood.
# The module allows to create and modify the value of a counter in the same manner both when the counter is about to be accessed
# from the same node, and from different nodes.

defmodule Worker do
@moduledoc false

# This is a GenServer created when the counter is about to be accessed from different nodes - it's running on the same node,
# where the :atomics variable is put, and processes from different nodes can ask it to modify the counter on their behalf.

use GenServer

@impl true
def init(parent_pid) do
Process.monitor(parent_pid)
{:ok, nil, :hibernate}
end

@impl true
def handle_call({:add_get, atomic_ref, value}, _from, _state) do
result = :atomics.add_get(atomic_ref, 1, value)
{:reply, result, nil}
end

@impl true
def handle_cast({:sub, atomic_ref, value}, _state) do
:atomics.sub(atomic_ref, 1, value)
{:noreply, nil}
end

@impl true
def handle_info({:DOWN, _ref, :process, _object, _reason}, state) do
{:stop, :normal, state}
end
end

@type t :: {pid(), :atomics.atomics_ref()}

@spec new() :: t
def new() do
atomic_ref = :atomics.new(1, [])
{:ok, pid} = GenServer.start(Worker, self())
{pid, atomic_ref}
end

@spec add_get(t, integer()) :: integer()
def add_get({pid, atomic_ref}, value) when node(pid) == node(self()) do
:atomics.add_get(atomic_ref, 1, value)
end

def add_get({pid, atomic_ref}, value) do
GenServer.call(pid, {:add_get, atomic_ref, value})
end

@spec sub(t, integer()) :: :ok
def sub({pid, atomic_ref}, value) when node(pid) == node(self()) do
:atomics.sub(atomic_ref, 1, value)
end

def sub({pid, atomic_ref}, value) do
GenServer.cast(pid, {:sub, atomic_ref, value})
end
end

@opaque t ::
{__MODULE__, DistributedCounter.t(), pos_integer, Process.dest(), pos_integer(),
non_neg_integer()}

@default_capacity_factor 200

@spec new(pos_integer() | nil, Membrane.Buffer.Metric.unit_t(), Process.dest()) :: t
def new(capacity, demand_unit, responsible_process) do
@spec new(
pos_integer() | nil,
Membrane.Buffer.Metric.unit_t(),
Process.dest(),
pos_integer()
) :: t
def new(capacity, demand_unit, responsible_process, throttling_factor) do
default_capacity =
Membrane.Buffer.Metric.from_unit(demand_unit).buffer_size_approximation() *
@default_capacity_factor

toilet_ref = DistributedCounter.new()
capacity = capacity || default_capacity
{__MODULE__, :atomics.new(1, []), capacity, responsible_process}
{__MODULE__, toilet_ref, capacity, responsible_process, throttling_factor, 0}
end

@spec fill(t, non_neg_integer) :: :ok | :overflow
def fill({__MODULE__, atomic, capacity, responsible_process}, amount) do
size = :atomics.add_get(atomic, 1, amount)

if size > capacity do
overflow(size, capacity, responsible_process)
:overflow
@spec fill(t, non_neg_integer) :: {:ok | :overflow, t}
def fill(
{__MODULE__, counter, capacity, responsible_process, throttling_factor,
unrinsed_buffers_size},
amount
) do
if unrinsed_buffers_size + amount < throttling_factor do
{:ok,
{__MODULE__, counter, capacity, responsible_process, throttling_factor,
amount + unrinsed_buffers_size}}
else
:ok
size = DistributedCounter.add_get(counter, amount + unrinsed_buffers_size)

if size > capacity do
overflow(size, capacity, responsible_process)
{:overflow, {__MODULE__, counter, capacity, responsible_process, throttling_factor, 0}}
else
{:ok, {__MODULE__, counter, capacity, responsible_process, throttling_factor, 0}}
end
end
end

@spec drain(t, non_neg_integer) :: :ok
def drain({__MODULE__, atomic, _capacity, _responsible_process}, amount) do
:atomics.sub(atomic, 1, amount)
def drain(
{__MODULE__, counter, _capacity, _responsible_process, _throttling_factor,
_unrinsed_buff_size},
amount
) do
DistributedCounter.sub(counter, amount)
end

defp overflow(size, capacity, responsible_process) do
Expand Down
13 changes: 11 additions & 2 deletions lib/membrane/parent_spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,13 @@ defmodule Membrane.ParentSpec do
for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future).
- `auto_demand_size` - Size of automatically generated demands. Used only for pads working in pull mode with automatic demands.
See `t:Membrane.Pad.mode_t/0` and `t:Membrane.Pad.demand_mode_t/0` for more info.
- `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1,
meaning, that the sender will update the toilet with each buffer being sent. Setting that factor for elements,
which are running on the same node, does not have an impact of performance. However, once the sending element and the receiving element are put on different nodes,
the sender updates the toilet with interprocess messages and setting a bigger `throttling_factor` can reduce the number of messages
in the system.
At the same time, setting a greater `throttling_factor` can result in a toilet overflow being detected later.
See the _links_ section of the moduledoc for more information.
"""
Expand All @@ -363,7 +370,8 @@ defmodule Membrane.ParentSpec do
toilet_capacity: number | nil,
target_queue_size: number | nil,
min_demand_factor: number | nil,
auto_demand_size: number | nil
auto_demand_size: number | nil,
throttling_factor: number | nil
) ::
link_builder_t() | no_return
def via_in(builder, pad, props \\ [])
Expand All @@ -388,7 +396,8 @@ defmodule Membrane.ParentSpec do
target_queue_size: [default: nil],
min_demand_factor: [default: nil],
auto_demand_size: [default: nil],
toilet_capacity: [default: nil]
toilet_capacity: [default: nil],
throttling_factor: [default: 1]
)
|> case do
{:ok, props} ->
Expand Down
49 changes: 49 additions & 0 deletions test/membrane/core/element/toilet_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Membrane.Core.Element.ToiletTest do
use ExUnit.Case
alias Membrane.Core.Element.Toilet

setup do
[responsible_process: spawn(fn -> nil end)]
end

test "if toilet is implemented as :atomics for elements put on the same node", context do
toilet = Toilet.new(100, :buffers, context.responsible_process, 1)

{_module, {_pid, atomic_ref}, _capacity, _responsible_process_pid, _throttling_factor,
_unrinsed_buffers} = toilet

Toilet.fill(toilet, 10)
assert :atomics.get(atomic_ref, 1) == 10
Toilet.drain(toilet, 10)
assert :atomics.get(atomic_ref, 1) == 0
end

test "if the receiving element uses toilet with :atomics and the sending element with a interprocess message, when the toilet is distributed",
context do
toilet = Toilet.new(100, :buffers, context.responsible_process, 1)

{_module, {counter_pid, atomic_ref}, _capacity, _responsible_process_pid, _throttling_factor,
_unrinsed_buffers} = toilet

Toilet.fill(toilet, 10)
assert GenServer.call(counter_pid, {:add_get, atomic_ref, 0}) == 10
assert :atomics.get(atomic_ref, 1) == 10
Toilet.drain(toilet, 10)
assert GenServer.call(counter_pid, {:add_get, atomic_ref, 0}) == 0
assert :atomics.get(atomic_ref, 1) == 0
end

test "if throttling mechanism works properly", context do
toilet = Toilet.new(100, :buffers, context.responsible_process, 10)
{:ok, toilet} = Toilet.fill(toilet, 10)
assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet
{:ok, toilet} = Toilet.fill(toilet, 5)
assert {_module, _counter, _capacity, _pid, _throttling_factor, 5} = toilet
{:ok, toilet} = Toilet.fill(toilet, 80)
assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet
{:ok, toilet} = Toilet.fill(toilet, 9)
assert {_module, _counter, _capacity, _pid, _throttling_factor, 9} = toilet
{:overflow, toilet} = Toilet.fill(toilet, 11)
assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet
end
end
6 changes: 4 additions & 2 deletions test/membrane/core/element_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ defmodule Membrane.Core.ElementTest do
toilet_capacity: nil,
target_queue_size: nil,
auto_demand_size: nil,
min_demand_factor: nil
min_demand_factor: nil,
throttling_factor: 1
},
child: :this
child: :this,
pid: self()
},
%Endpoint{pad_spec: :output, pad_ref: :output, pid: self(), child: :other},
%{
Expand Down
62 changes: 62 additions & 0 deletions test/membrane/integration/distributed_pipeline_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule Membrane.Integration.DistributedPipelineTest do
use ExUnit.Case
import Membrane.ParentSpec
import Membrane.Testing.Assertions
alias Membrane.ParentSpec

alias Membrane.Support.Distributed.{Sink, Source}

setup do
hostname = start_nodes()
on_exit(fn -> kill_node(hostname) end)
end

test "if distributed pipeline works properly" do
{:ok, pid} = Membrane.Testing.Pipeline.start([])

assert_pipeline_playback_changed(pid, _, :playing)

Membrane.Testing.Pipeline.execute_actions(pid, playback: :stopped)

assert_pipeline_playback_changed(pid, _, :stopped)

Membrane.Testing.Pipeline.execute_actions(pid,
spec: %ParentSpec{
children: [
source: %Source{output: [1, 2, 3, 4, 5]}
],
node: :"[email protected]"
}
)

Membrane.Testing.Pipeline.execute_actions(pid,
spec: %ParentSpec{
children: [
sink: Sink
],
links: [
link(:source)
|> via_in(:input, toilet_capacity: 100, throttling_factor: 50)
|> to(:sink)
],
node: :"[email protected]"
}
)

Membrane.Testing.Pipeline.execute_actions(pid, playback: :playing)
assert_pipeline_playback_changed(pid, _, :playing)
assert_end_of_stream(pid, :sink)
end

defp start_nodes() do
System.cmd("epmd", ["-daemon"])
{:ok, _pid} = Node.start(:"[email protected]", :longnames)
{:ok, _pid, hostname} = :peer.start(%{host: ~c"127.0.0.1", name: :second})
:rpc.block_call(hostname, :code, :add_paths, [:code.get_path()])
hostname
end

defp kill_node(node) do
:rpc.call(node, :init, :stop, [])
end
end
Loading

0 comments on commit 3136a25

Please sign in to comment.