Skip to content

Commit

Permalink
server: add gossip interface
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Apr 21, 2024
1 parent 707066f commit 29315ce
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 5 deletions.
106 changes: 104 additions & 2 deletions cli/serverv2/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package serverv2
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/serverv2/config"
"github.com/andydunstall/pico/serverv2/gossip"
"github.com/andydunstall/pico/serverv2/netmap"
adminserver "github.com/andydunstall/pico/serverv2/server/admin"
"github.com/hashicorp/go-sockaddr"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"go.uber.org/zap"
Expand Down Expand Up @@ -42,6 +46,9 @@ Examples:
# Start a Pico server, listening for proxy connections on :7000 and admin
// ocnnections on :9000.
pico server --proxy.bind-addr :8000 --admin.bind-addr :9000
# Start a Pico server and join an existing cluster.
pico server --cluster.members 10.26.104.14,10.26.104.75
`,
}

Expand Down Expand Up @@ -69,6 +76,34 @@ If the host is unspecified it defaults to all listeners, such as
'--admin.bind-addr :8081' will listen on '0.0.0.0:8081'`,
)

cmd.Flags().StringVar(
&conf.Gossip.BindAddr,
"gossip.bind-addr",
":7000",
`
The host/port to listen for inter-node gossip traffic.
If the host is unspecified it defaults to all listeners, such as
'--gossip.bind-addr :7000' will listen on '0.0.0.0:7000'`,
)

cmd.Flags().StringVar(
&conf.Gossip.AdvertiseAddr,
"gossip.advertise-addr",
"",
`
Gossip listen address to advertise to other nodes in the cluster. This is the
address other nodes will used to gossip with the.
Such as if the listen address is ':7000', the advertised address may be
'10.26.104.45:7000' or 'node1.cluster:7000'.
By default, if the bind address includes an IP to bind to that will be used.
If the bind address does not include an IP (such as ':7000') the nodes
private IP will be used, such as a bind address of ':7000' may have an
advertise address of '10.26.104.14:7000'.`,
)

cmd.Flags().StringVar(
&conf.Cluster.NodeID,
"cluster.node-id",
Expand All @@ -78,6 +113,24 @@ A unique identifier for the node in the cluster.
By default a random ID will be generated for the node.`,
)
cmd.Flags().StringSliceVar(
&conf.Cluster.Members,
"cluster.members",
nil,
`
A list of addresses of members in the cluster to join.
This may be either addresses of specific nodes, such as
'--cluster.members 10.26.104.14,10.26.104.75', or a domain that resolves to
the addresses of the nodes in the cluster (e.g. a Kubernetes headless
service), such as '--cluster.members pico.prod-pico-ns'.
Each address must include the host, and may optionally include a port. If no
port is given, the gossip port of this node is used.
Note each node propagates membership information to the other known nodes,
so the initial set of configured members only needs to be a subset of nodes.`,
)

cmd.Flags().StringVar(
&conf.Log.Level,
Expand Down Expand Up @@ -118,6 +171,15 @@ Such as you can enable 'gossip' logs with '--log.subsystems gossip'.`,
conf.Cluster.NodeID = netmap.GenerateNodeID()
}

if conf.Gossip.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Gossip.BindAddr)
if err != nil {
logger.Error("invalid configuration", zap.Error(err))
os.Exit(1)
}
conf.Gossip.AdvertiseAddr = advertiseAddr
}

run(&conf, logger)
}

Expand All @@ -130,8 +192,22 @@ func run(conf *config.Config, logger *log.Logger) {
registry := prometheus.NewRegistry()

networkMap := netmap.NewNetworkMap(&netmap.Node{
ID: conf.Cluster.NodeID,
ID: conf.Cluster.NodeID,
GossipAddr: conf.Gossip.AdvertiseAddr,
})
gossip, err := gossip.NewGossip(networkMap)
if err != nil {
logger.Error("failed to start gossip: %w", zap.Error(err))
os.Exit(1)
}
defer gossip.Close()

if len(conf.Cluster.Members) > 0 {
if err := gossip.Join(conf.Cluster.Members); err != nil {
logger.Error("failed to join cluster: %w", zap.Error(err))
os.Exit(1)
}
}

adminServer := adminserver.NewServer(
conf.Admin.BindAddr,
Expand Down Expand Up @@ -174,8 +250,34 @@ func run(conf *config.Config, logger *log.Logger) {

if err := g.Wait(); err != nil {
logger.Error("failed to run server", zap.Error(err))
os.Exit(1)
}

if err := gossip.Leave(); err != nil {
logger.Error("failed to leave gossip", zap.Error(err))
}

logger.Info("shutdown complete")
}

func advertiseAddrFromBindAddr(bindAddr string) (string, error) {
if strings.HasPrefix(bindAddr, ":") {
bindAddr = "0.0.0.0" + bindAddr
}

host, port, err := net.SplitHostPort(bindAddr)
if err != nil {
return "", fmt.Errorf("invalid bind addr: %s: %w", bindAddr, err)
}

if host == "0.0.0.0" {
ip, err := sockaddr.GetPrivateIP()
if err != nil {
return "", fmt.Errorf("get interface addr: %w", err)
}
if ip == "" {
return "", fmt.Errorf("no private ip found")
}
return ip + ":" + port, nil
}
return bindAddr, nil
}
26 changes: 24 additions & 2 deletions serverv2/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "fmt"

type ProxyConfig struct {
// BindAddr is the address to bind to listen for incoming HTTP connections.
BindAddr string `json:"http_addr"`
BindAddr string `json:"bind_addr"`
}

func (c *ProxyConfig) Validate() error {
Expand All @@ -16,7 +16,7 @@ func (c *ProxyConfig) Validate() error {

type AdminConfig struct {
// BindAddr is the address to bind to listen for incoming HTTP connections.
BindAddr string `json:"http_addr"`
BindAddr string `json:"bind_addr"`
}

func (c *AdminConfig) Validate() error {
Expand All @@ -26,9 +26,27 @@ func (c *AdminConfig) Validate() error {
return nil
}

type GossipConfig struct {
// BindAddr is the address to bind to listen for gossip traffic.
BindAddr string `json:"bind_addr"`

// AdvertiseAddr is the address to advertise to other nodes.
AdvertiseAddr string `json:"advertise_addr"`
}

func (c *GossipConfig) Validate() error {
if c.BindAddr == "" {
return fmt.Errorf("missing bind addr")
}
return nil
}

type ClusterConfig struct {
// NodeID is a unique identifier for this node in the cluster.
NodeID string `json:"node_id"`

// Members contians a list of addresses of members in the cluster to join.
Members []string `json:"members"`
}

type LogConfig struct {
Expand All @@ -48,6 +66,7 @@ func (c *LogConfig) Validate() error {
type Config struct {
Proxy ProxyConfig `json:"proxy"`
Admin AdminConfig `json:"admin"`
Gossip GossipConfig `json:"gossip"`
Cluster ClusterConfig `json:"cluster"`
Log LogConfig `json:"log"`
}
Expand All @@ -59,6 +78,9 @@ func (c *Config) Validate() error {
if err := c.Admin.Validate(); err != nil {
return fmt.Errorf("admin: %w", err)
}
if err := c.Gossip.Validate(); err != nil {
return fmt.Errorf("gossip: %w", err)
}
if err := c.Log.Validate(); err != nil {
return fmt.Errorf("log: %w", err)
}
Expand Down
30 changes: 30 additions & 0 deletions serverv2/gossip/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package gossip

import "github.com/andydunstall/pico/serverv2/netmap"

// Gossip is responsible for maintaining the nodes local NetworkMap and
// propagating the state of the local node to the rest of the cluster.
//
// It uses the 'kite' library for cluster membership anti-entropy, where each
// node maintains a local key-value store containing the nodes state which is
// then propagated to the other nodes in the cluster. Therefore Gossip
// manages updating the local key-value for this node, and watching for updates
// to other nodes and adding them to the netmap.
type Gossip struct {
}

func NewGossip(_ *netmap.NetworkMap) (*Gossip, error) {
return &Gossip{}, nil
}

func (g *Gossip) Join(_ []string) error {
return nil
}

func (g *Gossip) Leave() error {
return nil
}

func (g *Gossip) Close() error {
return nil
}
6 changes: 5 additions & 1 deletion serverv2/netmap/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ var (
type Node struct {
// ID is a unique identifier for the node in the cluster.
ID string `json:"id"`

// GossipAddr is the advertised gossip address.
GossipAddr string `json:"gossip_addr"`
}

func (n *Node) Copy() *Node {
return &Node{
ID: n.ID,
ID: n.ID,
GossipAddr: n.GossipAddr,
}
}

Expand Down

0 comments on commit 29315ce

Please sign in to comment.