Skip to content

Commit

Permalink
handle_child_terminated (#894)
Browse files Browse the repository at this point in the history
* Implement handle_child_terminated, bump version to 1.1.2
  • Loading branch information
FelonEkonom authored Oct 18, 2024
1 parent a9b052d commit 0f3e838
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 20 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Changelog

## 1.1.2
* Remove 'failed to insert a metric' stalker warning [#849](https://github.com/membraneframework/membrane_core/pull/849)
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
* Remove 'failed to insert a metric' stalker warning. [#849](https://github.com/membraneframework/membrane_core/pull/849)

## 1.1.1
* Fix 'table identifier does not refer to an existing ETS table' error when inserting metrics into the observability ETS. [#835](https://github.com/membraneframework/membrane_core/pull/835)
Expand Down
23 changes: 21 additions & 2 deletions lib/membrane/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,20 @@ defmodule Membrane.Bin do
state
) :: callback_return

@doc """
Callback invoked after a child terminates.
Terminated child won't be present in the context of this callback. It is allowed to spawn a new
child with the same name.
By default, it does nothing.
"""
@callback handle_child_terminated(
child :: Child.name(),
context :: CallbackContext.t(),
state
) :: callback_return

@doc """
Callback invoked upon each timer tick. A timer can be started with `t:Membrane.Bin.Action.start_timer/0`
action.
Expand Down Expand Up @@ -248,7 +262,8 @@ defmodule Membrane.Bin do
handle_tick: 3,
handle_crash_group_down: 3,
handle_terminate_request: 2,
handle_child_pad_removed: 4
handle_child_pad_removed: 4,
handle_child_terminated: 3

@doc PadsSpecs.def_pad_docs(:input, :bin)
defmacro def_input_pad(name, spec) do
Expand Down Expand Up @@ -405,6 +420,9 @@ defmodule Membrane.Bin do
@impl true
def handle_terminate_request(_ctx, state), do: {[terminate: :normal], state}

@impl true
def handle_child_terminated(_child, _ctx, state), do: {[], state}

defoverridable handle_init: 2,
handle_pad_added: 3,
handle_pad_removed: 3,
Expand All @@ -418,7 +436,8 @@ defmodule Membrane.Bin do
handle_child_notification: 4,
handle_parent_notification: 3,
handle_crash_group_down: 3,
handle_terminate_request: 2
handle_terminate_request: 2,
handle_child_terminated: 3
end
end

Expand Down
16 changes: 15 additions & 1 deletion lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -724,10 +724,14 @@ defmodule Membrane.Core.Parent.ChildLifeController do
CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, reason, state)
|> ChildrenModel.delete_child(child_name)

state = exec_handle_child_terminated(child_name, state)

{:ok, state}
else
:error when reason == :normal ->
{:ok, ChildrenModel.delete_child(state, child_name)}
state = ChildrenModel.delete_child(state, child_name)
state = exec_handle_child_terminated(child_name, state)
{:ok, state}

:error when reason == {:shutdown, :membrane_crash_group_kill} ->
raise Membrane.PipelineError,
Expand Down Expand Up @@ -771,4 +775,14 @@ defmodule Membrane.Core.Parent.ChildLifeController do
state = %{state | pending_specs: Map.merge(state.pending_specs, related_specs)}
related_specs |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2)
end

defp exec_handle_child_terminated(child_name, state) do
CallbackHandler.exec_and_handle_callback(
:handle_child_terminated,
Component.action_handler(state),
%{context: &Component.context_from_state/1},
[child_name],
state
)
end
end
23 changes: 21 additions & 2 deletions lib/membrane/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,20 @@ defmodule Membrane.Pipeline do
state
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked after a child terminates.
Terminated child won't be present in the context of this callback. It is allowed to spawn a new child
with the same name.
By default, it does nothing.
"""
@callback handle_child_terminated(
child :: Child.name(),
context :: CallbackContext.t(),
state
) :: callback_return

@doc """
Callback invoked upon each timer tick. A timer can be started with `Membrane.Pipeline.Action.start_timer`
action.
Expand Down Expand Up @@ -291,7 +305,8 @@ defmodule Membrane.Pipeline do
handle_crash_group_down: 3,
handle_call: 3,
handle_terminate_request: 2,
handle_child_pad_removed: 4
handle_child_pad_removed: 4,
handle_child_terminated: 3

@doc """
Starts the pipeline based on the given module and links it to the current process.
Expand Down Expand Up @@ -542,6 +557,9 @@ defmodule Membrane.Pipeline do
@impl true
def handle_child_setup_completed(_child, _ctx, state), do: {[], state}

@impl true
def handle_child_terminated(_child, _ctx, state), do: {[], state}

@impl true
def handle_child_playing(_child, _ctx, state), do: {[], state}

Expand Down Expand Up @@ -575,7 +593,8 @@ defmodule Membrane.Pipeline do
handle_child_notification: 4,
handle_crash_group_down: 3,
handle_call: 3,
handle_terminate_request: 2
handle_terminate_request: 2,
handle_child_terminated: 3
end
end
end
55 changes: 54 additions & 1 deletion lib/membrane/testing/assertions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ defmodule Membrane.Testing.Assertions do
end

@doc """
Refutes that a crash group within pipeline won't be down within the `timeout` period specified in
Asserts that a crash group within pipeline won't be down within the `timeout` period specified in
milliseconds.
Usage example:
Expand Down Expand Up @@ -541,4 +541,57 @@ defmodule Membrane.Testing.Assertions do
timeout
)
end

[:child_setup_completed, :child_playing, :child_terminated]
|> Enum.map(fn action ->
callback = :"handle_#{action}"
assertion = :"assert_#{action}"
refution = :"refute_#{action}"

@doc """
Asserts that `Membrane.Testing.Pipeline` executed or will execute callback `#{callback}/3`
for a specific child within the `timeout` period specified in milliseconds.
"""
defmacro unquote(assertion)(pipeline, child, timeout \\ @default_timeout) do
callback = unquote(callback)

quote do
child_name_value = unquote(child)

unquote(
assert_receive_from_pipeline(
pipeline,
{callback,
quote do
^child_name_value
end},
timeout
)
)
end
end

@doc """
Asserts that `Membrane.Testing.Pipeline` won't execute callback `#{callback}/3` for
a specific child within the `timeout` period specified in milliseconds.
"""
defmacro unquote(refution)(pipeline, child, timeout \\ @default_timeout) do
callback = unquote(callback)

quote do
child_name_value = unquote(child)

unquote(
refute_receive_from_pipeline(
pipeline,
{callback,
quote do
^child_name_value
end},
timeout
)
)
end
end
end)
end
8 changes: 7 additions & 1 deletion lib/membrane/testing/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,11 @@ defmodule Membrane.Testing.Pipeline do
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end

[:handle_child_setup_completed, :handle_child_playing]
[
:handle_child_setup_completed,
:handle_child_playing,
:handle_child_terminated
]
|> Enum.map(fn callback ->
@impl true
def unquote(callback)(child, ctx, %State{} = state) do
Expand All @@ -419,6 +423,8 @@ defmodule Membrane.Testing.Pipeline do
state
)

:ok = notify_test_process(state.test_process, {unquote(callback), child})

{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
end)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.Mixfile do
use Mix.Project

@version "1.1.1"
@version "1.1.2"
@source_ref "v#{@version}"

def project do
Expand Down
39 changes: 39 additions & 0 deletions test/membrane/integration/callbacks_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Membrane.Integration.CallbacksTest do
use ExUnit.Case, async: true

import Membrane.Testing.Assertions
import Membrane.ChildrenSpec

alias Membrane.Testing

defmodule PadlessElement do
use Membrane.Endpoint
end

defmodule PadlessElementPipeline do
use Membrane.Pipeline
alias Membrane.Integration.CallbacksTest.PadlessElement

@impl true
def handle_child_terminated(child_name, ctx, state) do
assert not is_map_key(ctx.children, child_name)
{[spec: child(child_name, PadlessElement)], state}
end
end

test "handle_child_terminated" do
pipeline = Testing.Pipeline.start_link_supervised!(module: PadlessElementPipeline)

Testing.Pipeline.execute_actions(pipeline, spec: child(:element, PadlessElement))
first_pid = Testing.Pipeline.get_child_pid!(pipeline, :element)
refute_child_terminated(pipeline, :element, 500)

Testing.Pipeline.execute_actions(pipeline, remove_children: :element)
assert_child_terminated(pipeline, :element)
second_pid = Testing.Pipeline.get_child_pid!(pipeline, :element)

assert first_pid != second_pid

Testing.Pipeline.terminate(pipeline)
end
end
14 changes: 3 additions & 11 deletions test/membrane/integration/defer_setup_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ defmodule Membrane.Integration.DeferSetupTest do
assert_child_playing(pipeline, bin)
end

assert_grandhild_playing(pipeline, :bin_1, :bin_a)
assert_grandchild_playing(pipeline, :bin_1, :bin_a)

for bin <- [:bin_b, :bin_c] do
refute_grandchild_playing(pipeline, :bin_1, bin)
Expand All @@ -127,7 +127,7 @@ defmodule Membrane.Integration.DeferSetupTest do
complete_grandchild_setup(pipeline, :bin_1, :bin_c)

for bin <- [:bin_b, :bin_c] do
assert_grandhild_playing(pipeline, :bin_1, bin)
assert_grandchild_playing(pipeline, :bin_1, bin)
end

monitor_ref = Process.monitor(pipeline)
Expand All @@ -145,18 +145,10 @@ defmodule Membrane.Integration.DeferSetupTest do
Pipeline.execute_actions(pipeline, notify_child: {child, {:complete_setup, grandchild}})
end

defp assert_child_playing(pipeline, child) do
assert_pipeline_notified(pipeline, child, :handle_playing)
end

defp assert_grandhild_playing(pipeline, child, grandchild) do
defp assert_grandchild_playing(pipeline, child, grandchild) do
assert_pipeline_notified(pipeline, child, {^grandchild, :handle_playing})
end

defp refute_child_playing(pipeline, child) do
refute_pipeline_notified(pipeline, child, :handle_playing)
end

defp refute_grandchild_playing(pipeline, child, grandchild) do
refute_pipeline_notified(pipeline, child, {^grandchild, :handle_playing})
end
Expand Down

0 comments on commit 0f3e838

Please sign in to comment.