From 0e62918d05d0e009819dcdc8a958567e14427c99 Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 6 Jun 2024 07:03:43 -0700 Subject: [PATCH 1/6] add shutdown timeout options, ensure pids are actually shutdown --- src/gleam/otp/supervisor.gleam | 100 +++++++++++++++++++++++++-- test/gleam/otp/supervisor_test.gleam | 14 ++-- 2 files changed, 104 insertions(+), 10 deletions(-) diff --git a/src/gleam/otp/supervisor.gleam b/src/gleam/otp/supervisor.gleam index 89a81af..eaec0fc 100644 --- a/src/gleam/otp/supervisor.gleam +++ b/src/gleam/otp/supervisor.gleam @@ -1,6 +1,9 @@ // TODO: specify amount of time permitted for shut-down +import gleam/dynamic +import gleam/erlang/atom import gleam/erlang/node.{type Node} import gleam/erlang/process.{type Pid, type Subject} +import gleam/io import gleam/option.{type Option, None, Some} import gleam/otp/actor.{type StartError} import gleam/otp/intensity_tracker.{type IntensityTracker} @@ -40,6 +43,7 @@ pub opaque type ChildSpec(msg, argument, returning) { // TODO: merge this into one field start: fn(argument) -> Result(Subject(msg), StartError), returning: fn(argument, Subject(msg)) -> returning, + shutdown: Shutdown, ) } @@ -57,6 +61,11 @@ type Instruction { StartFrom(Pid) } +type Shutdown { + BrutalKill + Timeout(Int) +} + type State(a) { State( restarts: IntensityTracker, @@ -96,10 +105,67 @@ fn start_child( )) } -// TODO: more sophsiticated stopping of processes. i.e. give supervisors -// more time to shut down. -fn shutdown_child(pid: Pid, _spec: ChildSpec(msg, arg_1, arg_2)) -> Nil { +fn shutdown_child(pid: Pid, spec: ChildSpec(msg, arg_1, arg_2)) { + case spec.shutdown { + BrutalKill -> shutdown_child_brutal_kill(pid) + Timeout(timeout) -> shutdown_child_timeout(pid, timeout) + } +} + +fn shutdown_child_timeout(pid: Pid, timeout: Int) { + let monitor = process.monitor_process(pid) process.send_exit(pid) + + let result = + process.new_selector() + |> process.selecting_process_down(monitor, fn(a) { a }) + |> process.select(timeout) + + case result { + Ok(process.ProcessDown(pid, _reason)) -> { + unlink_flush(pid) + } + + Error(Nil) -> { + process.kill(pid) + + let result = + process.new_selector() + |> process.selecting_process_down(monitor, fn(a) { a }) + |> process.select_forever() + + case result { + process.ProcessDown(pid, _reason) -> unlink_flush(pid) + } + } + } +} + +fn shutdown_child_brutal_kill(pid: Pid) { + let monitor = process.monitor_process(pid) + process.kill(pid) + + let result = + process.new_selector() + |> process.selecting_process_down(monitor, fn(a) { a }) + |> process.select_forever() + + case result { + process.ProcessDown(pid, _reason) -> unlink_flush(pid) + } +} + +fn unlink_flush(pid) { + process.unlink(pid) + let result = + process.new_selector() + |> process.selecting_anything(fn(a) { a }) + |> process.select(0) + + case result { + Error(Nil) -> Nil + Ok(_) -> Nil + } } fn perform_instruction_for_child( @@ -201,7 +267,11 @@ pub fn add( pub fn supervisor( start: fn(argument) -> Result(Subject(msg), StartError), ) -> ChildSpec(msg, argument, argument) { - ChildSpec(start: start, returning: fn(argument, _channel) { argument }) + ChildSpec( + start: start, + returning: fn(argument, _channel) { argument }, + shutdown: Timeout(5000), + ) } /// Prepare a new worker type child. @@ -223,7 +293,11 @@ pub fn supervisor( pub fn worker( start: fn(argument) -> Result(Subject(msg), StartError), ) -> ChildSpec(msg, argument, argument) { - ChildSpec(start: start, returning: fn(argument, _channel) { argument }) + ChildSpec( + start: start, + returning: fn(argument, _channel) { argument }, + shutdown: Timeout(5000), + ) } // TODO: test @@ -237,7 +311,21 @@ pub fn returning( child: ChildSpec(msg, argument_a, argument_b), updater: fn(argument_a, Subject(msg)) -> argument_c, ) -> ChildSpec(msg, argument_a, argument_c) { - ChildSpec(start: child.start, returning: updater) + ChildSpec(start: child.start, returning: updater, shutdown: child.shutdown) +} + +/// Change the shutdown timeout +pub fn shutdown_timeout( + child: ChildSpec(msg, argument_a, argument_b), + timeout: Int, +) -> ChildSpec(msg, argument_a, argument_b) { + ChildSpec(..child, shutdown: Timeout(timeout)) +} + +pub fn shutdown_brutal_kill( + child: ChildSpec(msg, argument_a, argument_b), +) -> ChildSpec(msg, argument_a, argument_b) { + ChildSpec(..child, shutdown: BrutalKill) } fn init( diff --git a/test/gleam/otp/supervisor_test.gleam b/test/gleam/otp/supervisor_test.gleam index da65b67..562baa3 100644 --- a/test/gleam/otp/supervisor_test.gleam +++ b/test/gleam/otp/supervisor_test.gleam @@ -26,6 +26,7 @@ pub fn supervisor_test() { let child = child |> returning(fn(name, _subject) { name + 1 }) + |> supervisor.shutdown_timeout(5) supervisor.start_spec( supervisor.Spec( @@ -43,18 +44,23 @@ pub fn supervisor_test() { |> should.be_ok // Assert children have started - let assert Ok(#(1, p)) = process.receive(subject, 10) - let assert Ok(#(2, _)) = process.receive(subject, 10) - let assert Ok(#(3, _)) = process.receive(subject, 10) + let assert Ok(#(1, pid1)) = process.receive(subject, 10) + let assert Ok(#(2, pid2)) = process.receive(subject, 10) + let assert Ok(#(3, pid3)) = process.receive(subject, 10) let assert Error(Nil) = process.receive(subject, 10) // Kill first child an assert they all restart - process.kill(p) + process.kill(pid1) let assert Ok(#(1, p1)) = process.receive(subject, 10) let assert Ok(#(2, p2)) = process.receive(subject, 10) let assert Ok(#(3, _)) = process.receive(subject, 10) let assert Error(Nil) = process.receive(subject, 10) + // Ensure that the original processes are dead + should.be_false(process.is_alive(pid1)) + should.be_false(process.is_alive(pid2)) + should.be_false(process.is_alive(pid3)) + // Kill second child an assert the following children restart process.kill(p2) let assert Ok(#(2, _)) = process.receive(subject, 10) From b0144d7dd50fce6611865c1f2935d6f0bdf2adae Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 6 Jun 2024 07:05:10 -0700 Subject: [PATCH 2/6] remove unusued --- src/gleam/otp/supervisor.gleam | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/gleam/otp/supervisor.gleam b/src/gleam/otp/supervisor.gleam index eaec0fc..5d992e6 100644 --- a/src/gleam/otp/supervisor.gleam +++ b/src/gleam/otp/supervisor.gleam @@ -1,9 +1,6 @@ // TODO: specify amount of time permitted for shut-down -import gleam/dynamic -import gleam/erlang/atom import gleam/erlang/node.{type Node} import gleam/erlang/process.{type Pid, type Subject} -import gleam/io import gleam/option.{type Option, None, Some} import gleam/otp/actor.{type StartError} import gleam/otp/intensity_tracker.{type IntensityTracker} From 23980a4fb8b2d7ed2a97c7fd22ce1803aa4b8c9f Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 6 Jun 2024 07:05:48 -0700 Subject: [PATCH 3/6] better name --- src/gleam/otp/supervisor.gleam | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/gleam/otp/supervisor.gleam b/src/gleam/otp/supervisor.gleam index 5d992e6..b97220c 100644 --- a/src/gleam/otp/supervisor.gleam +++ b/src/gleam/otp/supervisor.gleam @@ -40,7 +40,7 @@ pub opaque type ChildSpec(msg, argument, returning) { // TODO: merge this into one field start: fn(argument) -> Result(Subject(msg), StartError), returning: fn(argument, Subject(msg)) -> returning, - shutdown: Shutdown, + shutdown: ShutdownOptions, ) } @@ -58,7 +58,7 @@ type Instruction { StartFrom(Pid) } -type Shutdown { +type ShutdownOptions { BrutalKill Timeout(Int) } From 51d375f4f2a198656732a8c8fcbbf6abc66e8ec6 Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 6 Jun 2024 07:09:35 -0700 Subject: [PATCH 4/6] docs from otp supervisor --- src/gleam/otp/supervisor.gleam | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/gleam/otp/supervisor.gleam b/src/gleam/otp/supervisor.gleam index b97220c..7f77e39 100644 --- a/src/gleam/otp/supervisor.gleam +++ b/src/gleam/otp/supervisor.gleam @@ -152,6 +152,8 @@ fn shutdown_child_brutal_kill(pid: Pid) { } } +// We call unlink in order to guarantee that the 'EXIT' has arrived +// from the dead process. See the [unlink](https://www.erlang.org/doc/apps/erts/erlang.html#unlink/1) docs for details. fn unlink_flush(pid) { process.unlink(pid) let result = From 72392991553c15aa43cec461996d45d5e81f03cc Mon Sep 17 00:00:00 2001 From: ben Date: Fri, 7 Jun 2024 07:28:22 -0700 Subject: [PATCH 5/6] use erlang:exit(Pid, shutdown) because normal is ignored from external pids --- src/gleam/otp/supervisor.gleam | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/gleam/otp/supervisor.gleam b/src/gleam/otp/supervisor.gleam index 7f77e39..2f411ba 100644 --- a/src/gleam/otp/supervisor.gleam +++ b/src/gleam/otp/supervisor.gleam @@ -1,4 +1,4 @@ -// TODO: specify amount of time permitted for shut-down +import gleam/erlang/atom import gleam/erlang/node.{type Node} import gleam/erlang/process.{type Pid, type Subject} import gleam/option.{type Option, None, Some} @@ -102,6 +102,9 @@ fn start_child( )) } +@external(erlang, "erlang", "exit") +fn erlang_exit(pid: Pid, reason: atom.Atom) -> Nil + fn shutdown_child(pid: Pid, spec: ChildSpec(msg, arg_1, arg_2)) { case spec.shutdown { BrutalKill -> shutdown_child_brutal_kill(pid) @@ -111,7 +114,7 @@ fn shutdown_child(pid: Pid, spec: ChildSpec(msg, arg_1, arg_2)) { fn shutdown_child_timeout(pid: Pid, timeout: Int) { let monitor = process.monitor_process(pid) - process.send_exit(pid) + erlang_exit(pid, atom.create_from_string("shutdown")) let result = process.new_selector() From 8ca1bd238fdd385c6c3eed7c34e93b7cf9c4194b Mon Sep 17 00:00:00 2001 From: ben Date: Fri, 7 Jun 2024 07:30:49 -0700 Subject: [PATCH 6/6] actors now exit properly --- test/gleam/otp/supervisor_test.gleam | 1 - 1 file changed, 1 deletion(-) diff --git a/test/gleam/otp/supervisor_test.gleam b/test/gleam/otp/supervisor_test.gleam index 562baa3..cc5b1d2 100644 --- a/test/gleam/otp/supervisor_test.gleam +++ b/test/gleam/otp/supervisor_test.gleam @@ -26,7 +26,6 @@ pub fn supervisor_test() { let child = child |> returning(fn(name, _subject) { name + 1 }) - |> supervisor.shutdown_timeout(5) supervisor.start_spec( supervisor.Spec(