diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index 2262cf8a8..805ef44e7 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -158,7 +158,8 @@ defmodule Membrane.Core.Bin.PadController do :target_queue_size, :min_demand_factor, :auto_demand_size, - :toilet_capacity + :toilet_capacity, + :throttling_factor ] do external_value || internal_value else diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 95029cbb0..e5c756c6e 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -64,6 +64,7 @@ defmodule Membrane.Core.Element do node: #{node}, module: #{inspect(module)}, element options: #{inspect(user_options)}, + method: #{method} """) # rpc if necessary diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index a79c80d0a..21f1cd58a 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -109,16 +109,24 @@ defmodule Membrane.Core.Element.DemandHandler do PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) end - def handle_outgoing_buffers(pad_ref, %{mode: :push, toilet: toilet} = data, buffers, state) + def handle_outgoing_buffers( + pad_ref, + %{ + mode: :push, + toilet: toilet + } = data, + buffers, + state + ) when toilet != nil do %{other_demand_unit: other_demand_unit} = data buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers) case Toilet.fill(toilet, buf_size) do - :ok -> - state + {:ok, toilet} -> + PadModel.set_data!(state, pad_ref, :toilet, toilet) - :overflow -> + {:overflow, _toilet} -> # if the toilet has overflowed, we remove it so it didn't overflow again # and let the parent handle that situation by unlinking this output pad or crashing PadModel.set_data!(state, pad_ref, :toilet, nil) diff --git a/lib/membrane/core/element/lifecycle_controller.ex b/lib/membrane/core/element/lifecycle_controller.ex index b228a0187..4f154a3f2 100644 --- a/lib/membrane/core/element/lifecycle_controller.ex +++ b/lib/membrane/core/element/lifecycle_controller.ex @@ -116,6 +116,7 @@ defmodule Membrane.Core.Element.LifecycleController do @impl PlaybackHandler def handle_playback_state(old_playback_state, new_playback_state, state) do require CallbackContext.PlaybackChange + context = &CallbackContext.PlaybackChange.from_state/1 callback = PlaybackHandler.state_change_callback(old_playback_state, new_playback_state) diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 67528c0e5..badea6789 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -68,9 +68,16 @@ defmodule Membrane.Core.Element.PadController do :ok = Child.PadController.validate_pad_being_linked!(endpoint.pad_ref, direction, info, state) toilet = - if direction == :input, - do: Toilet.new(endpoint.pad_props.toilet_capacity, info.demand_unit, self()), - else: nil + if direction == :input do + Toilet.new( + endpoint.pad_props.toilet_capacity, + info.demand_unit, + self(), + endpoint.pad_props.throttling_factor + ) + else + nil + end do_handle_link(endpoint, other_endpoint, info, toilet, link_props, state) end diff --git a/lib/membrane/core/element/toilet.ex b/lib/membrane/core/element/toilet.ex index 11b59e1c0..426787b1f 100644 --- a/lib/membrane/core/element/toilet.ex +++ b/lib/membrane/core/element/toilet.ex @@ -7,35 +7,125 @@ defmodule Membrane.Core.Element.Toilet do require Membrane.Logger - @opaque t :: {__MODULE__, :atomics.atomics_ref(), pos_integer, Process.dest()} + defmodule DistributedCounter do + @moduledoc false + + # A module providing a common interface to access and modify a counter used in the toilet implementation. + # The counter uses :atomics module under the hood. + # The module allows to create and modify the value of a counter in the same manner both when the counter is about to be accessed + # from the same node, and from different nodes. + + defmodule Worker do + @moduledoc false + + # This is a GenServer created when the counter is about to be accessed from different nodes - it's running on the same node, + # where the :atomics variable is put, and processes from different nodes can ask it to modify the counter on their behalf. + + use GenServer + + @impl true + def init(parent_pid) do + Process.monitor(parent_pid) + {:ok, nil, :hibernate} + end + + @impl true + def handle_call({:add_get, atomic_ref, value}, _from, _state) do + result = :atomics.add_get(atomic_ref, 1, value) + {:reply, result, nil} + end + + @impl true + def handle_cast({:sub, atomic_ref, value}, _state) do + :atomics.sub(atomic_ref, 1, value) + {:noreply, nil} + end + + @impl true + def handle_info({:DOWN, _ref, :process, _object, _reason}, state) do + {:stop, :normal, state} + end + end + + @type t :: {pid(), :atomics.atomics_ref()} + + @spec new() :: t + def new() do + atomic_ref = :atomics.new(1, []) + {:ok, pid} = GenServer.start(Worker, self()) + {pid, atomic_ref} + end + + @spec add_get(t, integer()) :: integer() + def add_get({pid, atomic_ref}, value) when node(pid) == node(self()) do + :atomics.add_get(atomic_ref, 1, value) + end + + def add_get({pid, atomic_ref}, value) do + GenServer.call(pid, {:add_get, atomic_ref, value}) + end + + @spec sub(t, integer()) :: :ok + def sub({pid, atomic_ref}, value) when node(pid) == node(self()) do + :atomics.sub(atomic_ref, 1, value) + end + + def sub({pid, atomic_ref}, value) do + GenServer.cast(pid, {:sub, atomic_ref, value}) + end + end + + @opaque t :: + {__MODULE__, DistributedCounter.t(), pos_integer, Process.dest(), pos_integer(), + non_neg_integer()} @default_capacity_factor 200 - @spec new(pos_integer() | nil, Membrane.Buffer.Metric.unit_t(), Process.dest()) :: t - def new(capacity, demand_unit, responsible_process) do + @spec new( + pos_integer() | nil, + Membrane.Buffer.Metric.unit_t(), + Process.dest(), + pos_integer() + ) :: t + def new(capacity, demand_unit, responsible_process, throttling_factor) do default_capacity = Membrane.Buffer.Metric.from_unit(demand_unit).buffer_size_approximation() * @default_capacity_factor + toilet_ref = DistributedCounter.new() capacity = capacity || default_capacity - {__MODULE__, :atomics.new(1, []), capacity, responsible_process} + {__MODULE__, toilet_ref, capacity, responsible_process, throttling_factor, 0} end - @spec fill(t, non_neg_integer) :: :ok | :overflow - def fill({__MODULE__, atomic, capacity, responsible_process}, amount) do - size = :atomics.add_get(atomic, 1, amount) - - if size > capacity do - overflow(size, capacity, responsible_process) - :overflow + @spec fill(t, non_neg_integer) :: {:ok | :overflow, t} + def fill( + {__MODULE__, counter, capacity, responsible_process, throttling_factor, + unrinsed_buffers_size}, + amount + ) do + if unrinsed_buffers_size + amount < throttling_factor do + {:ok, + {__MODULE__, counter, capacity, responsible_process, throttling_factor, + amount + unrinsed_buffers_size}} else - :ok + size = DistributedCounter.add_get(counter, amount + unrinsed_buffers_size) + + if size > capacity do + overflow(size, capacity, responsible_process) + {:overflow, {__MODULE__, counter, capacity, responsible_process, throttling_factor, 0}} + else + {:ok, {__MODULE__, counter, capacity, responsible_process, throttling_factor, 0}} + end end end @spec drain(t, non_neg_integer) :: :ok - def drain({__MODULE__, atomic, _capacity, _responsible_process}, amount) do - :atomics.sub(atomic, 1, amount) + def drain( + {__MODULE__, counter, _capacity, _responsible_process, _throttling_factor, + _unrinsed_buff_size}, + amount + ) do + DistributedCounter.sub(counter, amount) end defp overflow(size, capacity, responsible_process) do diff --git a/lib/membrane/parent_spec.ex b/lib/membrane/parent_spec.ex index 4d7021a31..026627c8d 100644 --- a/lib/membrane/parent_spec.ex +++ b/lib/membrane/parent_spec.ex @@ -355,6 +355,13 @@ defmodule Membrane.ParentSpec do for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future). - `auto_demand_size` - Size of automatically generated demands. Used only for pads working in pull mode with automatic demands. See `t:Membrane.Pad.mode_t/0` and `t:Membrane.Pad.demand_mode_t/0` for more info. + - `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1, + meaning, that the sender will update the toilet with each buffer being sent. Setting that factor for elements, + which are running on the same node, does not have an impact of performance. However, once the sending element and the receiving element are put on different nodes, + the sender updates the toilet with interprocess messages and setting a bigger `throttling_factor` can reduce the number of messages + in the system. + At the same time, setting a greater `throttling_factor` can result in a toilet overflow being detected later. + See the _links_ section of the moduledoc for more information. """ @@ -363,7 +370,8 @@ defmodule Membrane.ParentSpec do toilet_capacity: number | nil, target_queue_size: number | nil, min_demand_factor: number | nil, - auto_demand_size: number | nil + auto_demand_size: number | nil, + throttling_factor: number | nil ) :: link_builder_t() | no_return def via_in(builder, pad, props \\ []) @@ -388,7 +396,8 @@ defmodule Membrane.ParentSpec do target_queue_size: [default: nil], min_demand_factor: [default: nil], auto_demand_size: [default: nil], - toilet_capacity: [default: nil] + toilet_capacity: [default: nil], + throttling_factor: [default: 1] ) |> case do {:ok, props} -> diff --git a/test/membrane/core/element/toilet_test.exs b/test/membrane/core/element/toilet_test.exs new file mode 100644 index 000000000..6aa557525 --- /dev/null +++ b/test/membrane/core/element/toilet_test.exs @@ -0,0 +1,49 @@ +defmodule Membrane.Core.Element.ToiletTest do + use ExUnit.Case + alias Membrane.Core.Element.Toilet + + setup do + [responsible_process: spawn(fn -> nil end)] + end + + test "if toilet is implemented as :atomics for elements put on the same node", context do + toilet = Toilet.new(100, :buffers, context.responsible_process, 1) + + {_module, {_pid, atomic_ref}, _capacity, _responsible_process_pid, _throttling_factor, + _unrinsed_buffers} = toilet + + Toilet.fill(toilet, 10) + assert :atomics.get(atomic_ref, 1) == 10 + Toilet.drain(toilet, 10) + assert :atomics.get(atomic_ref, 1) == 0 + end + + test "if the receiving element uses toilet with :atomics and the sending element with a interprocess message, when the toilet is distributed", + context do + toilet = Toilet.new(100, :buffers, context.responsible_process, 1) + + {_module, {counter_pid, atomic_ref}, _capacity, _responsible_process_pid, _throttling_factor, + _unrinsed_buffers} = toilet + + Toilet.fill(toilet, 10) + assert GenServer.call(counter_pid, {:add_get, atomic_ref, 0}) == 10 + assert :atomics.get(atomic_ref, 1) == 10 + Toilet.drain(toilet, 10) + assert GenServer.call(counter_pid, {:add_get, atomic_ref, 0}) == 0 + assert :atomics.get(atomic_ref, 1) == 0 + end + + test "if throttling mechanism works properly", context do + toilet = Toilet.new(100, :buffers, context.responsible_process, 10) + {:ok, toilet} = Toilet.fill(toilet, 10) + assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet + {:ok, toilet} = Toilet.fill(toilet, 5) + assert {_module, _counter, _capacity, _pid, _throttling_factor, 5} = toilet + {:ok, toilet} = Toilet.fill(toilet, 80) + assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet + {:ok, toilet} = Toilet.fill(toilet, 9) + assert {_module, _counter, _capacity, _pid, _throttling_factor, 9} = toilet + {:overflow, toilet} = Toilet.fill(toilet, 11) + assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet + end +end diff --git a/test/membrane/core/element_test.exs b/test/membrane/core/element_test.exs index 353e313c0..c7e220c90 100644 --- a/test/membrane/core/element_test.exs +++ b/test/membrane/core/element_test.exs @@ -80,9 +80,11 @@ defmodule Membrane.Core.ElementTest do toilet_capacity: nil, target_queue_size: nil, auto_demand_size: nil, - min_demand_factor: nil + min_demand_factor: nil, + throttling_factor: 1 }, - child: :this + child: :this, + pid: self() }, %Endpoint{pad_spec: :output, pad_ref: :output, pid: self(), child: :other}, %{ diff --git a/test/membrane/integration/distributed_pipeline_test.exs b/test/membrane/integration/distributed_pipeline_test.exs new file mode 100644 index 000000000..8970fe3a7 --- /dev/null +++ b/test/membrane/integration/distributed_pipeline_test.exs @@ -0,0 +1,62 @@ +defmodule Membrane.Integration.DistributedPipelineTest do + use ExUnit.Case + import Membrane.ParentSpec + import Membrane.Testing.Assertions + alias Membrane.ParentSpec + + alias Membrane.Support.Distributed.{Sink, Source} + + setup do + hostname = start_nodes() + on_exit(fn -> kill_node(hostname) end) + end + + test "if distributed pipeline works properly" do + {:ok, pid} = Membrane.Testing.Pipeline.start([]) + + assert_pipeline_playback_changed(pid, _, :playing) + + Membrane.Testing.Pipeline.execute_actions(pid, playback: :stopped) + + assert_pipeline_playback_changed(pid, _, :stopped) + + Membrane.Testing.Pipeline.execute_actions(pid, + spec: %ParentSpec{ + children: [ + source: %Source{output: [1, 2, 3, 4, 5]} + ], + node: :"first@127.0.0.1" + } + ) + + Membrane.Testing.Pipeline.execute_actions(pid, + spec: %ParentSpec{ + children: [ + sink: Sink + ], + links: [ + link(:source) + |> via_in(:input, toilet_capacity: 100, throttling_factor: 50) + |> to(:sink) + ], + node: :"second@127.0.0.1" + } + ) + + Membrane.Testing.Pipeline.execute_actions(pid, playback: :playing) + assert_pipeline_playback_changed(pid, _, :playing) + assert_end_of_stream(pid, :sink) + end + + defp start_nodes() do + System.cmd("epmd", ["-daemon"]) + {:ok, _pid} = Node.start(:"first@127.0.0.1", :longnames) + {:ok, _pid, hostname} = :peer.start(%{host: ~c"127.0.0.1", name: :second}) + :rpc.block_call(hostname, :code, :add_paths, [:code.get_path()]) + hostname + end + + defp kill_node(node) do + :rpc.call(node, :init, :stop, []) + end +end diff --git a/test/support/distributed.ex b/test/support/distributed.ex new file mode 100644 index 000000000..6e9f9d675 --- /dev/null +++ b/test/support/distributed.ex @@ -0,0 +1,49 @@ +defmodule Membrane.Support.Distributed do + @moduledoc false + defmodule Source do + @moduledoc false + use Membrane.Source + + def_output_pad :output, caps: :any, mode: :push + def_options output: [spec: list(any())] + + @impl true + def handle_init(opts) do + {:ok, opts.output} + end + + @impl true + def handle_prepared_to_playing(_ctx, list) do + {{:ok, caps: {:output, :some}, start_timer: {:timer, Membrane.Time.milliseconds(100)}}, + list} + end + + @impl true + def handle_tick(_timer_id, _context, [first | rest]) do + {{:ok, buffer: {:output, %Membrane.Buffer{payload: first}}}, rest} + end + + @impl true + def handle_tick(_timer_id, _context, []) do + {{:ok, end_of_stream: :output, stop_timer: :timer}, []} + end + end + + defmodule Sink do + @moduledoc false + + use Membrane.Sink + + def_input_pad :input, caps: :any, demand_unit: :buffers, mode: :pull + + @impl true + def handle_prepared_to_playing(_ctx, state) do + {{:ok, demand: {:input, 1}}, state} + end + + @impl true + def handle_write(_pad, _buffer, _ctx, state) do + {{:ok, demand: {:input, 1}}, state} + end + end +end