From edbd4515ff881700c8307e6daafb5652521d1233 Mon Sep 17 00:00:00 2001 From: Andy Dunstall Date: Wed, 17 Jul 2024 06:03:17 +0100 Subject: [PATCH] gossip: move gossip config under cluster Moves 'gossip' config to 'cluster.gossip', since gossip is part of the cluster configuration. --- docs/server/server.md | 110 +++++++++--------- pikotest/cluster/node.go | 10 +- pkg/gossip/config.go | 14 ++- server/config/config.go | 218 +++++++++++++++++------------------ server/config/config_test.go | 104 ++++++++--------- server/server.go | 12 +- 6 files changed, 235 insertions(+), 233 deletions(-) diff --git a/docs/server/server.md b/docs/server/server.md index 5194b8e..cb2029a 100644 --- a/docs/server/server.md +++ b/docs/server/server.md @@ -50,39 +50,6 @@ The server supports the following YAML configuration (where most parameters have corresponding command line flags): ``` -cluster: - # A unique identifier for the node in the cluster. - # - # By default a random ID will be generated for the node. - node_id: "" - - # A prefix for the node ID. - # - # Piko will generate a unique random identifier for the node and append it to - # the given prefix. - # - # Such as you could use the node or pod name as a prefix, then add a unique - # identifier to ensure the node ID is unique across restarts. - node_id_prefix: "" - - # A list of addresses of members in the cluster to join. - # - # This may be either addresses of specific nodes, such as - # '--cluster.join 10.26.104.14,10.26.104.75', or a domain that resolves to - # the addresses of the nodes in the cluster (e.g. a Kubernetes headless - # service), such as '--cluster.join piko.prod-piko-ns'. - # - # Each address must include the host, and may optionally include a port. If no - # port is given, the gossip port of this node is used. - # - # Note each node propagates membership information to the other known nodes, - # so the initial set of configured members only needs to be a subset of nodes. - join: [] - - # Whether the server node should abort if it is configured with more than one - # node to join (excluding itself) but fails to join any members. - abort_if_join_fails: true - proxy: # The host/port to listen for incoming proxy connections. # @@ -203,35 +170,68 @@ upstream: # Path to the PEM encoded key file. key: "" -gossip: - # The host/port to listen for inter-node gossip traffic. +cluster: + # A unique identifier for the node in the cluster. # - # If the host is unspecified it defaults to all listeners, such as - # '--gossip.bind-addr :8003' will listen on '0.0.0.0:8003'. - bind_addr: ":8003" + # By default a random ID will be generated for the node. + node_id: "" - # Gossip listen address to advertise to other nodes in the cluster. This is the - # address other nodes will used to gossip with the node. + # A prefix for the node ID. # - # Such as if the listen address is ':8003', the advertised address may be - # '10.26.104.45:8003' or 'node1.cluster:8003'. + # Piko will generate a unique random identifier for the node and append it to + # the given prefix. # - # By default, if the bind address includes an IP to bind to that will be used. - # If the bind address does not include an IP (such as ':8003') the nodes - # private IP will be used, such as a bind address of ':8003' may have an - # advertise address of '10.26.104.14:8003'. - advertise_addr: "" + # Such as you could use the node or pod name as a prefix, then add a unique + # identifier to ensure the node ID is unique across restarts. + node_id_prefix: "" - # The interval to initiate rounds of gossip. + # A list of addresses of members in the cluster to join. # - # Each gossip round selects another known node to synchronize with.`, - interval: 500ms - - # The maximum size of any packet sent. + # This may be either addresses of specific nodes, such as + # '--cluster.join 10.26.104.14,10.26.104.75', or a domain that resolves to + # the addresses of the nodes in the cluster (e.g. a Kubernetes headless + # service), such as '--cluster.join piko.prod-piko-ns'. # - # Depending on your networks MTU you may be able to increase to include more data - # in each packet. - max_packet_size: 1400 + # Each address must include the host, and may optionally include a port. If no + # port is given, the gossip port of this node is used. + # + # Note each node propagates membership information to the other known nodes, + # so the initial set of configured members only needs to be a subset of nodes. + join: [] + + # Whether the server node should abort if it is configured with more than one + # node to join (excluding itself) but fails to join any members. + abort_if_join_fails: true + + gossip: + # The host/port to listen for inter-node gossip traffic. + # + # If the host is unspecified it defaults to all listeners, such as + # a bind address of ':8003' will listen on '0.0.0.0:8003'. + bind_addr: ":8003" + + # Gossip listen address to advertise to other nodes in the cluster. This is the + # address other nodes will used to gossip with the node. + # + # Such as if the listen address is ':8003', the advertised address may be + # '10.26.104.45:8003' or 'node1.cluster:8003'. + # + # By default, if the bind address includes an IP to bind to that will be used. + # If the bind address does not include an IP (such as ':8003') the nodes + # private IP will be used, such as a bind address of ':8003' may have an + # advertise address of '10.26.104.14:8003'. + advertise_addr: "" + + # The interval to initiate rounds of gossip. + # + # Each gossip round selects another known node to synchronize with.`, + interval: 100ms + + # The maximum size of any packet sent. + # + # Depending on your networks MTU you may be able to increase to include more data + # in each packet. + max_packet_size: 1400 admin: # The host/port to listen for incoming admin connections. diff --git a/pikotest/cluster/node.go b/pikotest/cluster/node.go index 275f337..d978952 100644 --- a/pikotest/cluster/node.go +++ b/pikotest/cluster/node.go @@ -31,13 +31,13 @@ func NewNode(opts ...Option) *Node { } conf := config.Default() - conf.Cluster.NodeID = cluster.GenerateNodeID() - conf.Cluster.Join = options.join conf.Proxy.BindAddr = "127.0.0.1:0" conf.Upstream.BindAddr = "127.0.0.1:0" conf.Admin.BindAddr = "127.0.0.1:0" - conf.Gossip.BindAddr = "127.0.0.1:0" - conf.Gossip.Interval = time.Millisecond * 10 + conf.Cluster.NodeID = cluster.GenerateNodeID() + conf.Cluster.Join = options.join + conf.Cluster.Gossip.BindAddr = "127.0.0.1:0" + conf.Cluster.Gossip.Interval = time.Millisecond * 10 conf.Proxy.Auth = options.authConfig conf.Upstream.Auth = options.authConfig conf.Admin.Auth = options.authConfig @@ -123,7 +123,7 @@ func (n *Node) AdminAddr() string { } func (n *Node) GossipAddr() string { - return n.server.Config().Gossip.AdvertiseAddr + return n.server.Config().Cluster.Gossip.AdvertiseAddr } func (n *Node) ClusterState() *cluster.State { diff --git a/pkg/gossip/config.go b/pkg/gossip/config.go index bdd6a41..1aedb8b 100644 --- a/pkg/gossip/config.go +++ b/pkg/gossip/config.go @@ -34,21 +34,23 @@ func (c *Config) Validate() error { return nil } -func (c *Config) RegisterFlags(fs *pflag.FlagSet) { +func (c *Config) RegisterFlags(fs *pflag.FlagSet, prefix string) { + prefix = prefix + ".gossip." + fs.StringVar( &c.BindAddr, - "gossip.bind-addr", + prefix+"bind-addr", c.BindAddr, ` The host/port to listen for inter-node gossip traffic. If the host is unspecified it defaults to all listeners, such as -'--gossip.bind-addr :8003' will listen on '0.0.0.0:8003'`, +a bind address ':8003' will listen on '0.0.0.0:8003'`, ) fs.StringVar( &c.AdvertiseAddr, - "gossip.advertise-addr", + prefix+"advertise-addr", c.AdvertiseAddr, ` Gossip listen address to advertise to other nodes in the cluster. This is the @@ -65,7 +67,7 @@ advertise address of '10.26.104.14:8003'.`, fs.DurationVar( &c.Interval, - "gossip.interval", + prefix+"interval", c.Interval, ` The interval to initiate rounds of gossip. @@ -75,7 +77,7 @@ Each gossip round selects another known node to synchronize with.`, fs.IntVar( &c.MaxPacketSize, - "gossip.max-packet-size", + prefix+"max-packet-size", c.MaxPacketSize, ` The maximum size of any packet sent. diff --git a/server/config/config.go b/server/config/config.go index 74bc61e..0b797b8 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -11,98 +11,6 @@ import ( "github.com/andydunstall/piko/pkg/log" ) -type ClusterConfig struct { - // NodeID is a unique identifier for this node in the cluster. - NodeID string `json:"node_id" yaml:"node_id"` - - // NodeIDPrefix is a node ID prefix, where Piko will generate the rest of - // the node ID to ensure uniqueness. - NodeIDPrefix string `json:"node_id_prefix" yaml:"node_id_prefix"` - - // Join contains a list of addresses of members in the cluster to join. - Join []string `json:"join" yaml:"join"` - - // JoinTimeout is the time to keep trying to join the cluster before - // failing. - JoinTimeout time.Duration `json:"join_timeout" yaml:"join_timeout"` - - AbortIfJoinFails bool `json:"abort_if_join_fails" yaml:"abort_if_join_fails"` -} - -func (c *ClusterConfig) Validate() error { - if c.NodeID == "" { - return fmt.Errorf("missing node id") - } - if c.JoinTimeout == 0 { - return fmt.Errorf("missing join timeout") - } - - return nil -} - -func (c *ClusterConfig) RegisterFlags(fs *pflag.FlagSet) { - fs.StringVar( - &c.NodeID, - "cluster.node-id", - c.NodeID, - ` -A unique identifier for the node in the cluster. - -By default a random ID will be generated for the node.`, - ) - - fs.StringVar( - &c.NodeIDPrefix, - "cluster.node-id-prefix", - c.NodeIDPrefix, - ` -A prefix for the node ID. - -Piko will generate a unique random identifier for the node and append it to -the given prefix. - -Such as you could use the node or pod name as a prefix, then add a unique -identifier to ensure the node ID is unique across restarts.`, - ) - - fs.StringSliceVar( - &c.Join, - "cluster.join", - c.Join, - ` -A list of addresses of members in the cluster to join. - -This may be either addresses of specific nodes, such as -'--cluster.join 10.26.104.14,10.26.104.75', or a domain that resolves to -the addresses of the nodes in the cluster (e.g. a Kubernetes headless -service), such as '--cluster.join piko.prod-piko-ns'. - -Each address must include the host, and may optionally include a port. If no -port is given, the gossip port of this node is used. - -Note each node propagates membership information to the other known nodes, -so the initial set of configured members only needs to be a subset of nodes.`, - ) - - fs.DurationVar( - &c.JoinTimeout, - "cluster.join-timeout", - c.JoinTimeout, - ` -The timeout to attempt to join an existing cluster when 'cluster.join' is -set.`, - ) - - fs.BoolVar( - &c.AbortIfJoinFails, - "cluster.abort-if-join-fails", - c.AbortIfJoinFails, - ` -Whether the server node should abort if it is configured with more than one -node to join (excluding itself) but fails to join any members.`, - ) -} - // HTTPConfig contains generic configuration for the HTTP servers. type HTTPConfig struct { // ReadTimeout is the maximum duration for reading the entire @@ -373,6 +281,106 @@ advertise address of '10.26.104.14:8002'.`, c.TLS.RegisterFlags(fs, "admin") } +type ClusterConfig struct { + // NodeID is a unique identifier for this node in the cluster. + NodeID string `json:"node_id" yaml:"node_id"` + + // NodeIDPrefix is a node ID prefix, where Piko will generate the rest of + // the node ID to ensure uniqueness. + NodeIDPrefix string `json:"node_id_prefix" yaml:"node_id_prefix"` + + // Join contains a list of addresses of members in the cluster to join. + Join []string `json:"join" yaml:"join"` + + // JoinTimeout is the time to keep trying to join the cluster before + // failing. + JoinTimeout time.Duration `json:"join_timeout" yaml:"join_timeout"` + + AbortIfJoinFails bool `json:"abort_if_join_fails" yaml:"abort_if_join_fails"` + + Gossip gossip.Config `json:"gossip" yaml:"gossip"` +} + +func (c *ClusterConfig) Validate() error { + if c.NodeID == "" { + return fmt.Errorf("missing node id") + } + if c.JoinTimeout == 0 { + return fmt.Errorf("missing join timeout") + } + + if err := c.Gossip.Validate(); err != nil { + return fmt.Errorf("gossip: %w", err) + } + + return nil +} + +func (c *ClusterConfig) RegisterFlags(fs *pflag.FlagSet) { + fs.StringVar( + &c.NodeID, + "cluster.node-id", + c.NodeID, + ` +A unique identifier for the node in the cluster. + +By default a random ID will be generated for the node.`, + ) + + fs.StringVar( + &c.NodeIDPrefix, + "cluster.node-id-prefix", + c.NodeIDPrefix, + ` +A prefix for the node ID. + +Piko will generate a unique random identifier for the node and append it to +the given prefix. + +Such as you could use the node or pod name as a prefix, then add a unique +identifier to ensure the node ID is unique across restarts.`, + ) + + fs.StringSliceVar( + &c.Join, + "cluster.join", + c.Join, + ` +A list of addresses of members in the cluster to join. + +This may be either addresses of specific nodes, such as +'--cluster.join 10.26.104.14,10.26.104.75', or a domain that resolves to +the addresses of the nodes in the cluster (e.g. a Kubernetes headless +service), such as '--cluster.join piko.prod-piko-ns'. + +Each address must include the host, and may optionally include a port. If no +port is given, the gossip port of this node is used. + +Note each node propagates membership information to the other known nodes, +so the initial set of configured members only needs to be a subset of nodes.`, + ) + + fs.DurationVar( + &c.JoinTimeout, + "cluster.join-timeout", + c.JoinTimeout, + ` +The timeout to attempt to join an existing cluster when 'cluster.join' is +set.`, + ) + + fs.BoolVar( + &c.AbortIfJoinFails, + "cluster.abort-if-join-fails", + c.AbortIfJoinFails, + ` +Whether the server node should abort if it is configured with more than one +node to join (excluding itself) but fails to join any members.`, + ) + + c.Gossip.RegisterFlags(fs, "cluster") +} + type UsageConfig struct { // Disable indicates whether to disable anonymous usage collection. Disable bool `json:"disable" yaml:"disable"` @@ -393,15 +401,13 @@ architecture, requests processed and upstreams registered.`, } type Config struct { - Cluster ClusterConfig `json:"cluster" yaml:"cluster"` - Proxy ProxyConfig `json:"proxy" yaml:"proxy"` Upstream UpstreamConfig `json:"upstream" yaml:"upstream"` Admin AdminConfig `json:"admin" yaml:"admin"` - Gossip gossip.Config `json:"gossip" yaml:"gossip"` + Cluster ClusterConfig `json:"cluster" yaml:"cluster"` Usage UsageConfig `json:"usage" yaml:"usage"` @@ -415,10 +421,6 @@ type Config struct { func Default() *Config { return &Config{ - Cluster: ClusterConfig{ - JoinTimeout: time.Minute, - AbortIfJoinFails: true, - }, Proxy: ProxyConfig{ BindAddr: ":8000", Timeout: time.Second * 30, @@ -437,10 +439,14 @@ func Default() *Config { Admin: AdminConfig{ BindAddr: ":8002", }, - Gossip: gossip.Config{ - BindAddr: ":8003", - Interval: time.Millisecond * 100, - MaxPacketSize: 1400, + Cluster: ClusterConfig{ + JoinTimeout: time.Minute, + AbortIfJoinFails: true, + Gossip: gossip.Config{ + BindAddr: ":8003", + Interval: time.Millisecond * 100, + MaxPacketSize: 1400, + }, }, Log: log.Config{ Level: "info", @@ -466,10 +472,6 @@ func (c *Config) Validate() error { return fmt.Errorf("admin: %w", err) } - if err := c.Gossip.Validate(); err != nil { - return fmt.Errorf("gossip: %w", err) - } - if err := c.Log.Validate(); err != nil { return fmt.Errorf("log: %w", err) } @@ -490,8 +492,6 @@ func (c *Config) RegisterFlags(fs *pflag.FlagSet) { c.Admin.RegisterFlags(fs) - c.Gossip.RegisterFlags(fs) - c.Usage.RegisterFlags(fs) c.Log.RegisterFlags(fs) diff --git a/server/config/config_test.go b/server/config/config_test.go index c35e6de..f8b4550 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -24,15 +24,6 @@ func TestConfig_Default(t *testing.T) { // Tests loading the server configuration from YAML. func TestConfig_LoadYAML(t *testing.T) { yaml := ` -cluster: - node_id: "my-node" - join: - - 10.26.104.12:8003 - - 10.26.104.73:8003 - - 10.26.104.28:8003 - join_timeout: 2m - abort_if_join_fails: true - proxy: bind_addr: 10.15.104.25:8000 advertise_addr: 1.2.3.4:8000 @@ -90,11 +81,20 @@ admin: cert: /piko/cert.pem key: /piko/key.pem -gossip: - bind_addr: 10.15.104.25:8003 - advertise_addr: 1.2.3.4:8003 - interval: 100ms - max_packet_size: 1400 +cluster: + node_id: "my-node" + join: + - 10.26.104.12:8003 + - 10.26.104.73:8003 + - 10.26.104.28:8003 + join_timeout: 2m + abort_if_join_fails: true + + gossip: + bind_addr: 10.15.104.25:8003 + advertise_addr: 1.2.3.4:8003 + interval: 100ms + max_packet_size: 1400 usage: disable: true @@ -119,16 +119,6 @@ grace_period: 2m assert.NoError(t, config.Load(&loadedConf, f.Name(), false)) expectedConf := Config{ - Cluster: ClusterConfig{ - NodeID: "my-node", - Join: []string{ - "10.26.104.12:8003", - "10.26.104.73:8003", - "10.26.104.28:8003", - }, - JoinTimeout: 2 * time.Minute, - AbortIfJoinFails: true, - }, Proxy: ProxyConfig{ BindAddr: "10.15.104.25:8000", AdvertiseAddr: "1.2.3.4:8000", @@ -186,11 +176,21 @@ grace_period: 2m Key: "/piko/key.pem", }, }, - Gossip: gossip.Config{ - BindAddr: "10.15.104.25:8003", - AdvertiseAddr: "1.2.3.4:8003", - Interval: time.Millisecond * 100, - MaxPacketSize: 1400, + Cluster: ClusterConfig{ + NodeID: "my-node", + Join: []string{ + "10.26.104.12:8003", + "10.26.104.73:8003", + "10.26.104.28:8003", + }, + JoinTimeout: 2 * time.Minute, + AbortIfJoinFails: true, + Gossip: gossip.Config{ + BindAddr: "10.15.104.25:8003", + AdvertiseAddr: "1.2.3.4:8003", + Interval: time.Millisecond * 100, + MaxPacketSize: 1400, + }, }, Usage: UsageConfig{ Disable: true, @@ -210,10 +210,6 @@ grace_period: 2m // Tests loading the server configuration from flags. func TestConfig_LoadFlags(t *testing.T) { args := []string{ - "--cluster.node-id", "my-node", - "--cluster.join", "10.26.104.12:8003,10.26.104.73:8003,10.26.104.28:8003", - "--cluster.join-timeout", "2m", - "--cluster.abort-if-join-fails", "--proxy.bind-addr", "10.15.104.25:8000", "--proxy.advertise-addr", "1.2.3.4:8000", "--proxy.timeout", "20s", @@ -251,10 +247,14 @@ func TestConfig_LoadFlags(t *testing.T) { "--admin.tls.enabled", "--admin.tls.cert", "/piko/cert.pem", "--admin.tls.key", "/piko/key.pem", - "--gossip.bind-addr", "10.15.104.25:8003", - "--gossip.advertise-addr", "1.2.3.4:8003", - "--gossip.interval", "100ms", - "--gossip.max-packet-size", "1400", + "--cluster.node-id", "my-node", + "--cluster.join", "10.26.104.12:8003,10.26.104.73:8003,10.26.104.28:8003", + "--cluster.join-timeout", "2m", + "--cluster.abort-if-join-fails", + "--cluster.gossip.bind-addr", "10.15.104.25:8003", + "--cluster.gossip.advertise-addr", "1.2.3.4:8003", + "--cluster.gossip.interval", "100ms", + "--cluster.gossip.max-packet-size", "1400", "--usage.disable", "--log.level", "info", "--log.subsystems", "foo,bar", @@ -269,16 +269,6 @@ func TestConfig_LoadFlags(t *testing.T) { assert.NoError(t, fs.Parse(args)) expectedConf := Config{ - Cluster: ClusterConfig{ - NodeID: "my-node", - Join: []string{ - "10.26.104.12:8003", - "10.26.104.73:8003", - "10.26.104.28:8003", - }, - JoinTimeout: 2 * time.Minute, - AbortIfJoinFails: true, - }, Proxy: ProxyConfig{ BindAddr: "10.15.104.25:8000", AdvertiseAddr: "1.2.3.4:8000", @@ -336,11 +326,21 @@ func TestConfig_LoadFlags(t *testing.T) { Key: "/piko/key.pem", }, }, - Gossip: gossip.Config{ - BindAddr: "10.15.104.25:8003", - AdvertiseAddr: "1.2.3.4:8003", - Interval: time.Millisecond * 100, - MaxPacketSize: 1400, + Cluster: ClusterConfig{ + NodeID: "my-node", + Join: []string{ + "10.26.104.12:8003", + "10.26.104.73:8003", + "10.26.104.28:8003", + }, + JoinTimeout: 2 * time.Minute, + AbortIfJoinFails: true, + Gossip: gossip.Config{ + BindAddr: "10.15.104.25:8003", + AdvertiseAddr: "1.2.3.4:8003", + Interval: time.Millisecond * 100, + MaxPacketSize: 1400, + }, }, Usage: UsageConfig{ Disable: true, diff --git a/server/server.go b/server/server.go index 9c2b0ed..34869d0 100644 --- a/server/server.go +++ b/server/server.go @@ -338,9 +338,9 @@ func (s *Server) Wait(ctx context.Context) bool { } func (s *Server) startGossip() error { - gossipStreamLn, err := net.Listen("tcp", s.conf.Gossip.BindAddr) + gossipStreamLn, err := net.Listen("tcp", s.conf.Cluster.Gossip.BindAddr) if err != nil { - return fmt.Errorf("listen: %s: %w", s.conf.Gossip.BindAddr, err) + return fmt.Errorf("listen: %s: %w", s.conf.Cluster.Gossip.BindAddr, err) } gossipPacketLn, err := net.ListenUDP("udp", &net.UDPAddr{ @@ -348,10 +348,10 @@ func (s *Server) startGossip() error { Port: gossipStreamLn.Addr().(*net.TCPAddr).Port, }) if err != nil { - return fmt.Errorf("listen: %s: %w", s.conf.Gossip.BindAddr, err) + return fmt.Errorf("listen: %s: %w", s.conf.Cluster.Gossip.BindAddr, err) } - if s.conf.Gossip.AdvertiseAddr == "" { + if s.conf.Cluster.Gossip.AdvertiseAddr == "" { advertiseAddr, err := advertiseAddrFromListenAddr( gossipStreamLn.Addr().String(), ) @@ -359,14 +359,14 @@ func (s *Server) startGossip() error { // Should never happen. panic("invalid listen address: " + err.Error()) } - s.conf.Gossip.AdvertiseAddr = advertiseAddr + s.conf.Cluster.Gossip.AdvertiseAddr = advertiseAddr } s.gossiper = gossip.NewGossip( s.clusterState, gossipStreamLn, gossipPacketLn, - &s.conf.Gossip, + &s.conf.Cluster.Gossip, s.logger, ) s.gossiper.Metrics().Register(s.registry)