From 7287c1cde095889f82b233e05bdf29a501995022 Mon Sep 17 00:00:00 2001 From: benonymus Date: Fri, 15 Dec 2023 08:12:00 +0700 Subject: [PATCH 1/3] tweak child_specs --- lib/nsq/connection.ex | 20 +++++++++----------- lib/nsq/connection/supervisor.ex | 5 ++--- lib/nsq/consumer.ex | 7 +++++-- lib/nsq/consumer/supervisor.ex | 5 +---- lib/nsq/message/supervisor.ex | 4 ++-- lib/nsq/producer.ex | 7 +++++-- lib/nsq/producer/supervisor.ex | 2 +- 7 files changed, 25 insertions(+), 25 deletions(-) diff --git a/lib/nsq/connection.ex b/lib/nsq/connection.ex index 75736fe..0753763 100644 --- a/lib/nsq/connection.ex +++ b/lib/nsq/connection.ex @@ -6,6 +6,7 @@ defmodule NSQ.Connection do # ------------------------------------------------------- # # Directives # # ------------------------------------------------------- # + use GenServer alias NSQ.Connection.Command alias NSQ.Connection.Initializer alias NSQ.Connection.MessageHandling @@ -62,18 +63,15 @@ defmodule NSQ.Connection do # ------------------------------------------------------- # # Behaviour Implementation # # ------------------------------------------------------- # - @spec start_link(pid, host_with_port, NSQ.Config.t(), String.t(), String.t(), pid, list) :: + @spec start_link(pid | host_with_port | NSQ.Config.t() | String.t() | String.t() | pid) :: {:ok, pid} - def start_link( - parent, - nsqd, - config, - topic, - channel, - conn_info_pid, - event_manager_pid, - opts \\ [] - ) do + def start_link([parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid]), + do: start_link([parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, []]) + + @spec start_link(pid | host_with_port | NSQ.Config.t() | String.t() | String.t() | pid | list) :: + {:ok, pid} + + def start_link([parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, opts]) do state = %{ @initial_state | parent: parent, diff --git a/lib/nsq/connection/supervisor.ex b/lib/nsq/connection/supervisor.ex index 18c3717..1ea0a1b 100644 --- a/lib/nsq/connection/supervisor.ex +++ b/lib/nsq/connection/supervisor.ex @@ -36,10 +36,9 @@ defmodule NSQ.Connection.Supervisor do # When using nsqlookupd, we expect connections will be naturally # rediscovered if they fail. - config = - [id: conn_id, start: {NSQ.Connection, :start_link, args}, restart: :temporary] ++ opts + opts = [id: conn_id, restart: :temporary] ++ opts - child = Map.new(config) + child = Supervisor.child_spec({NSQ.Connection, args}, opts) Supervisor.start_child(conn_sup_pid, child) end diff --git a/lib/nsq/consumer.ex b/lib/nsq/consumer.ex index b0be15f..4c081f5 100644 --- a/lib/nsq/consumer.ex +++ b/lib/nsq/consumer.ex @@ -131,8 +131,11 @@ defmodule NSQ.Consumer do @doc """ Starts a Consumer process, called via the supervisor. """ - @spec start_link(String.t(), String.t(), NSQ.Config.t(), list) :: {:ok, pid} - def start_link(topic, channel, config, opts \\ []) do + @spec start_link(String.t() | String.t() | NSQ.Config.t()) :: {:ok, pid} + def start_link([topic, channel, config]), do: start_link([topic, channel, config, []]) + + @spec start_link(String.t() | String.t() | NSQ.Config.t() | list) :: {:ok, pid} + def start_link([topic, channel, config, opts]) do {:ok, config} = NSQ.Config.validate(config) {:ok, config} = NSQ.Config.normalize(config) unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}") diff --git a/lib/nsq/consumer/supervisor.ex b/lib/nsq/consumer/supervisor.ex index 79d175b..3d51251 100644 --- a/lib/nsq/consumer/supervisor.ex +++ b/lib/nsq/consumer/supervisor.ex @@ -12,10 +12,7 @@ defmodule NSQ.Consumer.Supervisor do rdy_loop_id = String.to_atom("#{consumer_name}_rdy_loop") children = [ - %{ - id: NSQ.Consumer, - start: {NSQ.Consumer, :start_link, [topic, channel, config, [name: consumer_name]]} - }, + {NSQ.Consumer, [topic, channel, config, [name: consumer_name]]}, Supervisor.child_spec( {Task, fn -> NSQ.Consumer.Connections.discovery_loop(consumer_name) end}, id: discovery_loop_id diff --git a/lib/nsq/message/supervisor.ex b/lib/nsq/message/supervisor.ex index de403c6..8c62b55 100644 --- a/lib/nsq/message/supervisor.ex +++ b/lib/nsq/message/supervisor.ex @@ -17,8 +17,8 @@ defmodule NSQ.Message.Supervisor do def start_child(msg_sup_pid, message, opts \\ []) do # If a message fails, NSQ will handle requeueing. id = message.id <> "-" <> UUID.uuid4(:hex) - config = [id: id, start: {NSQ.Message, :start_link, [message]}, restart: :temporary] ++ opts - child = Map.new(config) + opts = [id: id, restart: :temporary] ++ opts + child = Supervisor.child_spec({NSQ.Message, message}, opts) Supervisor.start_child(msg_sup_pid, child) end diff --git a/lib/nsq/producer.ex b/lib/nsq/producer.ex index 14f6716..1a7a16b 100644 --- a/lib/nsq/producer.ex +++ b/lib/nsq/producer.ex @@ -134,8 +134,11 @@ defmodule NSQ.Producer do # ------------------------------------------------------- # # API Definitions # # ------------------------------------------------------- # - @spec start_link(binary, NSQ.Config.t(), GenServer.options()) :: {:ok, pid} - def start_link(topic, config, genserver_options \\ []) do + @spec start_link([{binary | NSQ.Config.t()}]) :: {:ok, pid} + def start_link([topic, config]), do: start_link([topic, config, []]) + + @spec start_link([binary | NSQ.Config.t() | GenServer.options()]) :: {:ok, pid} + def start_link([topic, config, genserver_options]) do {:ok, config} = NSQ.Config.validate(config || %NSQ.Config{}) {:ok, config} = NSQ.Config.normalize(config) unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}") diff --git a/lib/nsq/producer/supervisor.ex b/lib/nsq/producer/supervisor.ex index d2ef603..5632cee 100644 --- a/lib/nsq/producer/supervisor.ex +++ b/lib/nsq/producer/supervisor.ex @@ -7,7 +7,7 @@ defmodule NSQ.Producer.Supervisor do @impl true def init({topic, config}) do - children = [%{id: NSQ.Producer, start: {NSQ.Producer, :start_link, [topic, config]}}] + children = [{NSQ.Producer, [topic, config]}] Supervisor.init(children, strategy: :one_for_one) end From d2d5e01d4543fe3ddd88581d37a09cc2643e1a5c Mon Sep 17 00:00:00 2001 From: benonymus Date: Fri, 15 Dec 2023 12:53:38 +0700 Subject: [PATCH 2/3] permanent restart for the tasks --- lib/nsq/consumer/supervisor.ex | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/nsq/consumer/supervisor.ex b/lib/nsq/consumer/supervisor.ex index 3d51251..6dd3b7c 100644 --- a/lib/nsq/consumer/supervisor.ex +++ b/lib/nsq/consumer/supervisor.ex @@ -13,13 +13,16 @@ defmodule NSQ.Consumer.Supervisor do children = [ {NSQ.Consumer, [topic, channel, config, [name: consumer_name]]}, + # Tasks have temporary restart policy by default Supervisor.child_spec( {Task, fn -> NSQ.Consumer.Connections.discovery_loop(consumer_name) end}, - id: discovery_loop_id + id: discovery_loop_id, + restart: :permanent ), Supervisor.child_spec( {Task, fn -> NSQ.Consumer.RDY.redistribute_loop(consumer_name) end}, - id: rdy_loop_id + id: rdy_loop_id, + restart: :permanent ) ] From f5bbde38dbbb31d86c8f33ca72bf52c8c021bc8f Mon Sep 17 00:00:00 2001 From: benonymus Date: Sat, 16 Dec 2023 06:48:07 +0700 Subject: [PATCH 3/3] use tuples for start_links and fix typespecs --- lib/nsq/connection.ex | 11 +++++------ lib/nsq/connection/supervisor.ex | 4 ++-- lib/nsq/consumer.ex | 8 ++++---- lib/nsq/consumer/supervisor.ex | 2 +- lib/nsq/producer.ex | 8 ++++---- lib/nsq/producer/supervisor.ex | 2 +- 6 files changed, 17 insertions(+), 18 deletions(-) diff --git a/lib/nsq/connection.ex b/lib/nsq/connection.ex index 0753763..e2097f1 100644 --- a/lib/nsq/connection.ex +++ b/lib/nsq/connection.ex @@ -63,15 +63,14 @@ defmodule NSQ.Connection do # ------------------------------------------------------- # # Behaviour Implementation # # ------------------------------------------------------- # - @spec start_link(pid | host_with_port | NSQ.Config.t() | String.t() | String.t() | pid) :: + @spec start_link({pid | host_with_port | NSQ.Config.t() | String.t() | String.t() | pid}) :: {:ok, pid} - def start_link([parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid]), - do: start_link([parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, []]) + def start_link({parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid}), + do: start_link({parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, []}) - @spec start_link(pid | host_with_port | NSQ.Config.t() | String.t() | String.t() | pid | list) :: + @spec start_link({pid | host_with_port | NSQ.Config.t() | String.t() | String.t() | pid | list}) :: {:ok, pid} - - def start_link([parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, opts]) do + def start_link({parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, opts}) do state = %{ @initial_state | parent: parent, diff --git a/lib/nsq/connection/supervisor.ex b/lib/nsq/connection/supervisor.ex index 1ea0a1b..f9be01b 100644 --- a/lib/nsq/connection/supervisor.ex +++ b/lib/nsq/connection/supervisor.ex @@ -22,7 +22,7 @@ defmodule NSQ.Connection.Supervisor do parent_state = parent_state || GenServer.call(parent, :state) conn_sup_pid = parent_state.conn_sup_pid - args = [ + args = { parent, nsqd, parent_state.config, @@ -30,7 +30,7 @@ defmodule NSQ.Connection.Supervisor do parent_state.channel, parent_state.conn_info_pid, parent_state.event_manager_pid - ] + } conn_id = ConnInfo.conn_id(parent, nsqd) diff --git a/lib/nsq/consumer.ex b/lib/nsq/consumer.ex index 4c081f5..c205448 100644 --- a/lib/nsq/consumer.ex +++ b/lib/nsq/consumer.ex @@ -131,11 +131,11 @@ defmodule NSQ.Consumer do @doc """ Starts a Consumer process, called via the supervisor. """ - @spec start_link(String.t() | String.t() | NSQ.Config.t()) :: {:ok, pid} - def start_link([topic, channel, config]), do: start_link([topic, channel, config, []]) + @spec start_link({String.t() | String.t() | NSQ.Config.t()}) :: {:ok, pid} + def start_link({topic, channel, config}), do: start_link({topic, channel, config, []}) - @spec start_link(String.t() | String.t() | NSQ.Config.t() | list) :: {:ok, pid} - def start_link([topic, channel, config, opts]) do + @spec start_link({String.t() | String.t() | NSQ.Config.t() | list}) :: {:ok, pid} + def start_link({topic, channel, config, opts}) do {:ok, config} = NSQ.Config.validate(config) {:ok, config} = NSQ.Config.normalize(config) unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}") diff --git a/lib/nsq/consumer/supervisor.ex b/lib/nsq/consumer/supervisor.ex index 6dd3b7c..ccf14db 100644 --- a/lib/nsq/consumer/supervisor.ex +++ b/lib/nsq/consumer/supervisor.ex @@ -12,7 +12,7 @@ defmodule NSQ.Consumer.Supervisor do rdy_loop_id = String.to_atom("#{consumer_name}_rdy_loop") children = [ - {NSQ.Consumer, [topic, channel, config, [name: consumer_name]]}, + {NSQ.Consumer, {topic, channel, config, [name: consumer_name]}}, # Tasks have temporary restart policy by default Supervisor.child_spec( {Task, fn -> NSQ.Consumer.Connections.discovery_loop(consumer_name) end}, diff --git a/lib/nsq/producer.ex b/lib/nsq/producer.ex index 1a7a16b..966cec5 100644 --- a/lib/nsq/producer.ex +++ b/lib/nsq/producer.ex @@ -134,11 +134,11 @@ defmodule NSQ.Producer do # ------------------------------------------------------- # # API Definitions # # ------------------------------------------------------- # - @spec start_link([{binary | NSQ.Config.t()}]) :: {:ok, pid} - def start_link([topic, config]), do: start_link([topic, config, []]) + @spec start_link({binary(), NSQ.Config.t()}) :: {:ok, pid} + def start_link({topic, config}), do: start_link({topic, config, []}) - @spec start_link([binary | NSQ.Config.t() | GenServer.options()]) :: {:ok, pid} - def start_link([topic, config, genserver_options]) do + @spec start_link({binary, NSQ.Config.t(), GenServer.options()}) :: {:ok, pid} + def start_link({topic, config, genserver_options}) do {:ok, config} = NSQ.Config.validate(config || %NSQ.Config{}) {:ok, config} = NSQ.Config.normalize(config) unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}") diff --git a/lib/nsq/producer/supervisor.ex b/lib/nsq/producer/supervisor.ex index 5632cee..baffdcb 100644 --- a/lib/nsq/producer/supervisor.ex +++ b/lib/nsq/producer/supervisor.ex @@ -7,7 +7,7 @@ defmodule NSQ.Producer.Supervisor do @impl true def init({topic, config}) do - children = [{NSQ.Producer, [topic, config]}] + children = [{NSQ.Producer, {topic, config}}] Supervisor.init(children, strategy: :one_for_one) end