From 8f7a1867e83e1ea384b0f410b39ed5474d599174 Mon Sep 17 00:00:00 2001 From: Andrew Dunstall Date: Sat, 11 May 2024 06:58:15 +0100 Subject: [PATCH] cluster: rename 'netmap' to 'cluster state' --- cli/status/{netmap.go => cluster.go} | 52 +-- cli/status/command.go | 6 +- docs/getting-started.md | 4 +- docs/manage/observability.md | 2 +- server/{netmap => cluster}/metrics.go | 17 +- server/{netmap => cluster}/node.go | 2 +- server/cluster/state.go | 304 +++++++++++++++++ .../netmap_test.go => cluster/state_test.go} | 132 ++++---- server/{netmap => cluster}/status.go | 14 +- server/gossip/gossip.go | 22 +- server/gossip/syncer.go | 82 ++--- server/gossip/syncer_test.go | 102 +++--- server/netmap/netmap.go | 305 ------------------ server/proxy/proxy.go | 6 +- server/proxy/proxy_test.go | 32 +- server/proxy/remote.go | 20 +- server/server.go | 14 +- status/client/client.go | 14 +- tests/cluster_test.go | 2 +- 19 files changed, 566 insertions(+), 566 deletions(-) rename cli/status/{netmap.go => cluster.go} (66%) rename server/{netmap => cluster}/metrics.go (50%) rename server/{netmap => cluster}/node.go (99%) create mode 100644 server/cluster/state.go rename server/{netmap/netmap_test.go => cluster/state_test.go} (59%) rename server/{netmap => cluster}/status.go (76%) delete mode 100644 server/netmap/netmap.go diff --git a/cli/status/netmap.go b/cli/status/cluster.go similarity index 66% rename from cli/status/netmap.go rename to cli/status/cluster.go index dccab0d..0604717 100644 --- a/cli/status/netmap.go +++ b/cli/status/cluster.go @@ -6,36 +6,36 @@ import ( "os" "sort" - "github.com/andydunstall/pico/server/netmap" + "github.com/andydunstall/pico/server/cluster" "github.com/andydunstall/pico/status/client" "github.com/andydunstall/pico/status/config" yaml "github.com/goccy/go-yaml" "github.com/spf13/cobra" ) -func newNetmapCommand() *cobra.Command { +func newClusterCommand() *cobra.Command { cmd := &cobra.Command{ - Use: "netmap", - Short: "inspect proxy netmap", + Use: "cluster", + Short: "inspect proxy cluster", } - cmd.AddCommand(newNetmapNodesCommand()) - cmd.AddCommand(newNetmapNodeCommand()) + cmd.AddCommand(newClusterNodesCommand()) + cmd.AddCommand(newClusterNodeCommand()) return cmd } -func newNetmapNodesCommand() *cobra.Command { +func newClusterNodesCommand() *cobra.Command { cmd := &cobra.Command{ Use: "nodes", - Short: "inspect netmap nodes", - Long: `Inspect netmap nodes. + Short: "inspect cluster nodes", + Long: `Inspect cluster nodes. Queries the server for the set of nodes the cluster that this node knows about. The output contains the state of each known node. Examples: - pico status netmap nodes + pico status cluster nodes `, } @@ -56,25 +56,25 @@ Pico server URL. This URL should point to the server admin port. os.Exit(1) } - showNetmapNodes(&conf) + showClusterNodes(&conf) } return cmd } -type netmapNodesOutput struct { - Nodes []*netmap.Node `json:"nodes"` +type clusterNodesOutput struct { + Nodes []*cluster.Node `json:"nodes"` } -func showNetmapNodes(conf *config.Config) { +func showClusterNodes(conf *config.Config) { // The URL has already been validated in conf. url, _ := url.Parse(conf.Server.URL) client := client.NewClient(url) defer client.Close() - nodes, err := client.NetmapNodes() + nodes, err := client.ClusterNodes() if err != nil { - fmt.Printf("failed to get netmap nodes: %s\n", err.Error()) + fmt.Printf("failed to get cluster nodes: %s\n", err.Error()) os.Exit(1) } @@ -83,29 +83,29 @@ func showNetmapNodes(conf *config.Config) { return nodes[i].ID < nodes[j].ID }) - output := netmapNodesOutput{ + output := clusterNodesOutput{ Nodes: nodes, } b, _ := yaml.Marshal(output) fmt.Println(string(b)) } -func newNetmapNodeCommand() *cobra.Command { +func newClusterNodeCommand() *cobra.Command { cmd := &cobra.Command{ Use: "node", Args: cobra.ExactArgs(1), - Short: "inspect netmap node", - Long: `Inspect a netmap node. + Short: "inspect cluster node", + Long: `Inspect a cluster node. Queries the server for the known state of the node with the given ID. Or use a node ID of 'local' to query the local node. Examples: # Inspect node bbc69214. - pico status netmap node bbc69214 + pico status cluster node bbc69214 # Inspect local node. - pico status netmap node local + pico status cluster node local `, } @@ -126,21 +126,21 @@ Pico server URL. This URL should point to the server admin port. os.Exit(1) } - showNetmapNode(args[0], &conf) + showClusterNode(args[0], &conf) } return cmd } -func showNetmapNode(nodeID string, conf *config.Config) { +func showClusterNode(nodeID string, conf *config.Config) { // The URL has already been validated in conf. url, _ := url.Parse(conf.Server.URL) client := client.NewClient(url) defer client.Close() - node, err := client.NetmapNode(nodeID) + node, err := client.ClusterNode(nodeID) if err != nil { - fmt.Printf("failed to get netmap nodes: %s: %s\n", nodeID, err.Error()) + fmt.Printf("failed to get cluster nodes: %s: %s\n", nodeID, err.Error()) os.Exit(1) } diff --git a/cli/status/command.go b/cli/status/command.go index ee315e9..cc04f09 100644 --- a/cli/status/command.go +++ b/cli/status/command.go @@ -17,8 +17,8 @@ can be used to answer questions such as: See 'status --help' for the availale commands. Examples: - # Inspect the members in the netmap. - pico status netmap members + # Inspect the known nodes in the cluster. + pico status cluster nodes # Inspect the upstream listeners connected to this node. pico status proxy endpoints @@ -29,7 +29,7 @@ Examples: } cmd.AddCommand(newProxyCommand()) - cmd.AddCommand(newNetmapCommand()) + cmd.AddCommand(newClusterCommand()) cmd.AddCommand(newGossipCommand()) return cmd diff --git a/docs/getting-started.md b/docs/getting-started.md index de3ac0a..93eb817 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -33,7 +33,7 @@ the status API, and health checks - Prometheus (`http://localhost:9090`) - Grafana (`http://localhost:3000`) -To verify Pico has started correctly, run `pico status netmap nodes` which +To verify Pico has started correctly, run `pico status cluster nodes` which queries the Pico admin API for the set of known Pico nodes. ## Agent @@ -53,7 +53,7 @@ pico agent --endpoints my-endpoint/localhost:4000 See `pico agent -h` for the available options. -You can verify the endpoint has connected, again run `pico status netmap nodes` +You can verify the endpoint has connected, again run `pico status cluster nodes` and you'll see one of the Pico server nodes reporting endpoint `my-endpoint` has an active connection. diff --git a/docs/manage/observability.md b/docs/manage/observability.md index 2536f1d..058ed42 100644 --- a/docs/manage/observability.md +++ b/docs/manage/observability.md @@ -37,6 +37,6 @@ at `/status` on the admin port that `pico status` then queries. Such as to view the endpoints registers on a server use `pico status proxy endpoints`. Or to inspect the set of known nodes in the -cluster use `pico status netmap nodes`. +cluster use `pico status cluster nodes`. Configure the server URL with `--server`. diff --git a/server/netmap/metrics.go b/server/cluster/metrics.go similarity index 50% rename from server/netmap/metrics.go rename to server/cluster/metrics.go index b3f7be6..1c0133a 100644 --- a/server/netmap/metrics.go +++ b/server/cluster/metrics.go @@ -1,20 +1,21 @@ -package netmap +package cluster import "github.com/prometheus/client_golang/prometheus" type Metrics struct { - // Entries contains the number of entries in the netmap, labelled by + // Nodes contains the number of known nodes in the cluster, labelled by // status. - Entries *prometheus.GaugeVec + Nodes *prometheus.GaugeVec } func NewMetrics() *Metrics { return &Metrics{ - Entries: prometheus.NewGaugeVec( + Nodes: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Subsystem: "netmap", - Name: "entries", - Help: "Number of entries in the netmap", + Namespace: "pico", + Subsystem: "cluster", + Name: "nodes", + Help: "Number of nodes in the cluster state", }, []string{"status"}, ), @@ -23,6 +24,6 @@ func NewMetrics() *Metrics { func (m *Metrics) Register(registry *prometheus.Registry) { registry.MustRegister( - m.Entries, + m.Nodes, ) } diff --git a/server/netmap/node.go b/server/cluster/node.go similarity index 99% rename from server/netmap/node.go rename to server/cluster/node.go index 78e27f7..2c03cc9 100644 --- a/server/netmap/node.go +++ b/server/cluster/node.go @@ -1,4 +1,4 @@ -package netmap +package cluster import ( "crypto/rand" diff --git a/server/cluster/state.go b/server/cluster/state.go new file mode 100644 index 0000000..7f4e9ac --- /dev/null +++ b/server/cluster/state.go @@ -0,0 +1,304 @@ +package cluster + +import ( + "sync" + + "github.com/andydunstall/pico/pkg/log" + "github.com/prometheus/client_golang/prometheus" +) + +// State represents the known state of the cluster as seen by the local +// node. +// +// This state is eventually consistent. +type State struct { + localID string + nodes map[string]*Node + + localEndpointSubscribers []func(endpointID string, listeners int) + + // mu protects the above fields. + mu sync.RWMutex + + metrics *Metrics + + logger log.Logger +} + +func NewState( + localNode *Node, + logger log.Logger, +) *State { + // The local node is always active. + localNode.Status = NodeStatusActive + nodes := make(map[string]*Node) + nodes[localNode.ID] = localNode + + s := &State{ + localID: localNode.ID, + nodes: nodes, + metrics: NewMetrics(), + logger: logger.WithSubsystem("cluster"), + } + s.addMetricsNode(localNode.Status) + return s +} + +// Node returns the known state of the node with the given ID, or false if the +// node is unknown. +func (s *State) Node(id string) (*Node, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + node, ok := s.nodes[id] + if !ok { + return nil, false + } + return node.Copy(), true +} + +// LocalID returns the ID of the local node. +func (s *State) LocalID() string { + // localID is immutable so don't need a mutex. + return s.localID +} + +// LocalNode returns the state of the local node. +func (s *State) LocalNode() *Node { + s.mu.RLock() + defer s.mu.RUnlock() + + node, ok := s.nodes[s.localID] + if !ok { + panic("local node not in cluster") + } + return node.Copy() +} + +// Nodes returns the state of the known nodes. +func (s *State) Nodes() []*Node { + s.mu.RLock() + defer s.mu.RUnlock() + + nodes := make([]*Node, 0, len(s.nodes)) + for _, node := range s.nodes { + nodes = append(nodes, node.Copy()) + } + return nodes +} + +func (s *State) LookupEndpoint(endpointID string) (*Node, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, node := range s.nodes { + if node.ID == s.localID { + // Ignore ourselves. + continue + } + if listeners, ok := node.Endpoints[endpointID]; ok && listeners > 0 { + return node.Copy(), true + } + } + + return nil, false +} + +// AddLocalEndpoint adds the active endpoint to the local node state. +func (s *State) AddLocalEndpoint(endpointID string) { + s.mu.Lock() + defer s.mu.Unlock() + + node, ok := s.nodes[s.localID] + if !ok { + panic("local node not in cluster") + } + + if node.Endpoints == nil { + node.Endpoints = make(map[string]int) + } + + node.Endpoints[endpointID] = node.Endpoints[endpointID] + 1 + + for _, f := range s.localEndpointSubscribers { + f(endpointID, node.Endpoints[endpointID]) + } +} + +// RemoveLocalEndpoint removes the active endpoint from the local node state. +func (s *State) RemoveLocalEndpoint(endpointID string) { + s.mu.Lock() + defer s.mu.Unlock() + + node, ok := s.nodes[s.localID] + if !ok { + panic("local node not in cluster") + } + + if node.Endpoints == nil { + node.Endpoints = make(map[string]int) + } + + listeners, ok := node.Endpoints[endpointID] + if !ok || listeners == 0 { + s.logger.Warn("remove local endpoint: endpoint not found") + return + } + + if listeners > 1 { + node.Endpoints[endpointID] = listeners - 1 + } else { + delete(node.Endpoints, endpointID) + } + + for _, f := range s.localEndpointSubscribers { + f(endpointID, node.Endpoints[endpointID]) + } +} + +// OnLocalEndpointUpdate subscribes to changes to the local nodes active +// endpoints. +// +// The callback is called with the cluster mutex locked so must not block or +// call back to the cluster. +func (s *State) OnLocalEndpointUpdate(f func(endpointID string, listeners int)) { + s.mu.Lock() + defer s.mu.Unlock() + + s.localEndpointSubscribers = append(s.localEndpointSubscribers, f) +} + +// AddNode adds the given node to the cluster. +func (s *State) AddNode(node *Node) { + s.mu.Lock() + defer s.mu.Unlock() + + if node.ID == s.localID { + s.logger.Warn("add node: cannot add local node") + return + } + + if _, ok := s.nodes[node.ID]; ok { + // If already in the cluster update the node but warn as this should + // not happen. + s.logger.Warn("add node: node already in cluster") + } + + s.nodes[node.ID] = node + s.addMetricsNode(node.Status) +} + +// RemoveNode removes the node with the given ID from the cluster. +func (s *State) RemoveNode(id string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if id == s.localID { + s.logger.Warn("remove node: cannot remove local node") + return false + } + + node, ok := s.nodes[id] + if !ok { + s.logger.Warn("remove node: node not in cluster") + return false + } + + delete(s.nodes, id) + s.removeMetricsNode(node.Status) + + return true +} + +// UpdateRemoteStatus sets the status of the remote node with the given ID. +func (s *State) UpdateRemoteStatus(id string, status NodeStatus) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if id == s.localID { + s.logger.Warn("update remote status: cannot update local node") + return false + } + + n, ok := s.nodes[id] + if !ok { + s.logger.Warn("update remote status: node not in cluster") + return false + } + + oldStatus := n.Status + n.Status = status + s.updateMetricsNode(oldStatus, status) + return true +} + +// UpdateRemoteEndpoint sets the number of listeners for the active endpoint +// for the node with the given ID. +func (s *State) UpdateRemoteEndpoint( + id string, + endpointID string, + listeners int, +) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if id == s.localID { + s.logger.Warn("update remote endpoint: cannot update local node") + return false + } + + n, ok := s.nodes[id] + if !ok { + s.logger.Warn("update remote endpoint: node not in cluster") + return false + } + + if n.Endpoints == nil { + n.Endpoints = make(map[string]int) + } + + n.Endpoints[endpointID] = listeners + + return true +} + +// RemoveRemoteEndpoint removes the active endpoint from the node with the +// given ID. +func (s *State) RemoveRemoteEndpoint(id string, endpointID string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if id == s.localID { + s.logger.Warn("remove remote endpoint: cannot update local node") + return false + } + + n, ok := s.nodes[id] + if !ok { + s.logger.Warn("remove remote endpoint: node not in cluster") + return false + } + + if n.Endpoints != nil { + delete(n.Endpoints, endpointID) + } + + return true +} + +func (s *State) Metrics() *Metrics { + return s.metrics +} + +func (s *State) updateMetricsNode(oldStatus NodeStatus, newStatus NodeStatus) { + s.removeMetricsNode(oldStatus) + s.addMetricsNode(newStatus) +} + +func (s *State) addMetricsNode(status NodeStatus) { + s.metrics.Nodes.With(prometheus.Labels{"status": string(status)}).Inc() +} + +func (s *State) removeMetricsNode(status NodeStatus) { + s.metrics.Nodes.With(prometheus.Labels{"status": string(status)}).Dec() +} diff --git a/server/netmap/netmap_test.go b/server/cluster/state_test.go similarity index 59% rename from server/netmap/netmap_test.go rename to server/cluster/state_test.go index 9ac185a..ebb781b 100644 --- a/server/netmap/netmap_test.go +++ b/server/cluster/state_test.go @@ -1,4 +1,4 @@ -package netmap +package cluster import ( "sort" @@ -8,88 +8,88 @@ import ( "github.com/stretchr/testify/assert" ) -func TestNetworkMap_LocalNode(t *testing.T) { +func TestState_LocalNode(t *testing.T) { localNode := &Node{ ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) - assert.Equal(t, "local", m.LocalID()) + assert.Equal(t, "local", s.LocalID()) - n, ok := m.Node("local") + n, ok := s.Node("local") assert.True(t, ok) assert.Equal(t, localNode, n) - assert.Equal(t, localNode, m.LocalNode()) + assert.Equal(t, localNode, s.LocalNode()) - assert.Equal(t, []*Node{localNode}, m.Nodes()) + assert.Equal(t, []*Node{localNode}, s.Nodes()) } -func TestNetworkMap_UpdateLocalEndpoint(t *testing.T) { +func TestState_UpdateLocalEndpoint(t *testing.T) { localNode := &Node{ ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) var notifyEndpointID string var notifyListeners int - m.OnLocalEndpointUpdate(func(endpointID string, listeners int) { + s.OnLocalEndpointUpdate(func(endpointID string, listeners int) { notifyEndpointID = endpointID notifyListeners = listeners }) - m.AddLocalEndpoint("my-endpoint") + s.AddLocalEndpoint("my-endpoint") assert.Equal(t, "my-endpoint", notifyEndpointID) assert.Equal(t, 1, notifyListeners) - n, _ := m.Node("local") + n, _ := s.Node("local") assert.Equal(t, 1, n.Endpoints["my-endpoint"]) - m.AddLocalEndpoint("my-endpoint") + s.AddLocalEndpoint("my-endpoint") assert.Equal(t, "my-endpoint", notifyEndpointID) assert.Equal(t, 2, notifyListeners) - n, _ = m.Node("local") + n, _ = s.Node("local") assert.Equal(t, 2, n.Endpoints["my-endpoint"]) - m.RemoveLocalEndpoint("my-endpoint") + s.RemoveLocalEndpoint("my-endpoint") assert.Equal(t, "my-endpoint", notifyEndpointID) assert.Equal(t, 1, notifyListeners) - n, _ = m.Node("local") + n, _ = s.Node("local") assert.Equal(t, 1, n.Endpoints["my-endpoint"]) - m.RemoveLocalEndpoint("my-endpoint") + s.RemoveLocalEndpoint("my-endpoint") assert.Equal(t, "my-endpoint", notifyEndpointID) assert.Equal(t, 0, notifyListeners) - n, _ = m.Node("local") + n, _ = s.Node("local") assert.Equal(t, 0, n.Endpoints["my-endpoint"]) // Removing an endpoint when none exist should have no affect. - m.RemoveLocalEndpoint("my-endpoint") + s.RemoveLocalEndpoint("my-endpoint") assert.Equal(t, "my-endpoint", notifyEndpointID) assert.Equal(t, 0, notifyListeners) - n, _ = m.Node("local") + n, _ = s.Node("local") assert.Equal(t, 0, n.Endpoints["my-endpoint"]) } -func TestNetworkMap_AddNode(t *testing.T) { +func TestState_AddNode(t *testing.T) { t.Run("add node", func(t *testing.T) { localNode := &Node{ ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) newNode := &Node{ ID: "remote", Status: NodeStatusActive, } - m.AddNode(newNode) - n, ok := m.Node("remote") + s.AddNode(newNode) + n, ok := s.Node("remote") assert.True(t, ok) assert.Equal(t, newNode, n) - nodes := m.Nodes() + nodes := s.Nodes() // Sort to simplify comparison. sort.Slice(nodes, func(i, j int) bool { return nodes[i].ID < nodes[j].ID @@ -102,22 +102,22 @@ func TestNetworkMap_AddNode(t *testing.T) { ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) newNode := &Node{ ID: "remote", Status: NodeStatusActive, } - m.AddNode(newNode) + s.AddNode(newNode) // Attempting to add a node with the same ID should succeed. newNode = &Node{ ID: "remote", Status: NodeStatusDown, } - m.AddNode(newNode) + s.AddNode(newNode) - n, ok := m.Node("remote") + n, ok := s.Node("remote") assert.True(t, ok) assert.Equal(t, newNode, n) }) @@ -127,7 +127,7 @@ func TestNetworkMap_AddNode(t *testing.T) { ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) // Add a new node with the same ID as the local node. The local node // should not be updated. @@ -135,29 +135,29 @@ func TestNetworkMap_AddNode(t *testing.T) { ID: "local", Status: NodeStatusActive, } - m.AddNode(newNode) - assert.Equal(t, localNode, m.LocalNode()) + s.AddNode(newNode) + assert.Equal(t, localNode, s.LocalNode()) }) } -func TestNetworkMap_RemoveNode(t *testing.T) { +func TestState_RemoveNode(t *testing.T) { t.Run("remove node", func(t *testing.T) { localNode := &Node{ ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) newNode := &Node{ ID: "remote", Status: NodeStatusDown, } - m.AddNode(newNode) - assert.True(t, m.RemoveNode(newNode.ID)) - _, ok := m.Node("remote") + s.AddNode(newNode) + assert.True(t, s.RemoveNode(newNode.ID)) + _, ok := s.Node("remote") assert.False(t, ok) - assert.Equal(t, []*Node{localNode}, m.Nodes()) + assert.Equal(t, []*Node{localNode}, s.Nodes()) }) t.Run("remove local node", func(t *testing.T) { @@ -165,30 +165,30 @@ func TestNetworkMap_RemoveNode(t *testing.T) { ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) // Attempting to delete the local node should have no affect. - assert.False(t, m.RemoveNode(localNode.ID)) - assert.Equal(t, localNode, m.LocalNode()) + assert.False(t, s.RemoveNode(localNode.ID)) + assert.Equal(t, localNode, s.LocalNode()) }) } -func TestNetworkMap_UpdateRemoteStatus(t *testing.T) { +func TestState_UpdateRemoteStatus(t *testing.T) { t.Run("update status", func(t *testing.T) { localNode := &Node{ ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) newNode := &Node{ ID: "remote", Status: NodeStatusActive, } - m.AddNode(newNode) - assert.True(t, m.UpdateRemoteStatus("remote", NodeStatusDown)) + s.AddNode(newNode) + assert.True(t, s.UpdateRemoteStatus("remote", NodeStatusDown)) - n, _ := m.Node("remote") + n, _ := s.Node("remote") assert.Equal(t, NodeStatusDown, n.Status) }) @@ -197,30 +197,30 @@ func TestNetworkMap_UpdateRemoteStatus(t *testing.T) { ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) // Attempting to update the local node should have no affect. - assert.False(t, m.UpdateRemoteStatus("local", NodeStatusDown)) - assert.Equal(t, localNode, m.LocalNode()) + assert.False(t, s.UpdateRemoteStatus("local", NodeStatusDown)) + assert.Equal(t, localNode, s.LocalNode()) }) } -func TestNetworkMap_UpdateRemoteEndpoint(t *testing.T) { +func TestState_UpdateRemoteEndpoint(t *testing.T) { t.Run("update endpoint", func(t *testing.T) { localNode := &Node{ ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) newNode := &Node{ ID: "remote", Status: NodeStatusDown, } - m.AddNode(newNode) - assert.True(t, m.UpdateRemoteEndpoint("remote", "my-endpoint", 7)) + s.AddNode(newNode) + assert.True(t, s.UpdateRemoteEndpoint("remote", "my-endpoint", 7)) - n, _ := m.Node("remote") + n, _ := s.Node("remote") assert.Equal(t, 7, n.Endpoints["my-endpoint"]) }) @@ -229,31 +229,31 @@ func TestNetworkMap_UpdateRemoteEndpoint(t *testing.T) { ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) // Attempting to update the local node should have no affect. - assert.False(t, m.UpdateRemoteEndpoint("local", "my-endpoint", 7)) - assert.Equal(t, localNode, m.LocalNode()) + assert.False(t, s.UpdateRemoteEndpoint("local", "my-endpoint", 7)) + assert.Equal(t, localNode, s.LocalNode()) }) } -func TestNetworkMap_RemoveRemoteEndpoint(t *testing.T) { +func TestState_RemoveRemoteEndpoint(t *testing.T) { t.Run("update endpoint", func(t *testing.T) { localNode := &Node{ ID: "local", Status: NodeStatusActive, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) newNode := &Node{ ID: "remote", Status: NodeStatusActive, } - m.AddNode(newNode) - assert.True(t, m.UpdateRemoteEndpoint("remote", "my-endpoint", 7)) - assert.True(t, m.RemoveRemoteEndpoint("remote", "my-endpoint")) + s.AddNode(newNode) + assert.True(t, s.UpdateRemoteEndpoint("remote", "my-endpoint", 7)) + assert.True(t, s.RemoveRemoteEndpoint("remote", "my-endpoint")) - n, _ := m.Node("remote") + n, _ := s.Node("remote") assert.Equal(t, 0, n.Endpoints["my-endpoint"]) }) @@ -265,10 +265,10 @@ func TestNetworkMap_RemoveRemoteEndpoint(t *testing.T) { "my-endpoint": 7, }, } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + s := NewState(localNode.Copy(), log.NewNopLogger()) // Attempting to update the local node should have no affect. - assert.False(t, m.RemoveRemoteEndpoint("local", "my-endpoint")) - assert.Equal(t, localNode, m.LocalNode()) + assert.False(t, s.RemoveRemoteEndpoint("local", "my-endpoint")) + assert.Equal(t, localNode, s.LocalNode()) }) } diff --git a/server/netmap/status.go b/server/cluster/status.go similarity index 76% rename from server/netmap/status.go rename to server/cluster/status.go index b076333..4bea7c8 100644 --- a/server/netmap/status.go +++ b/server/cluster/status.go @@ -1,4 +1,4 @@ -package netmap +package cluster import ( "net/http" @@ -8,12 +8,12 @@ import ( ) type Status struct { - networkMap *NetworkMap + state *State } -func NewStatus(networkMap *NetworkMap) *Status { +func NewStatus(state *State) *Status { return &Status{ - networkMap: networkMap, + state: state, } } @@ -24,18 +24,18 @@ func (s *Status) Register(group *gin.RouterGroup) { } func (s *Status) listNodesRoute(c *gin.Context) { - nodes := s.networkMap.Nodes() + nodes := s.state.Nodes() c.JSON(http.StatusOK, nodes) } func (s *Status) getLocalNodeRoute(c *gin.Context) { - node := s.networkMap.LocalNode() + node := s.state.LocalNode() c.JSON(http.StatusOK, node) } func (s *Status) getNodeRoute(c *gin.Context) { id := c.Param("id") - node, ok := s.networkMap.Node(id) + node, ok := s.state.Node(id) if !ok { c.Status(http.StatusNotFound) return diff --git a/server/gossip/gossip.go b/server/gossip/gossip.go index 4882bf8..772b060 100644 --- a/server/gossip/gossip.go +++ b/server/gossip/gossip.go @@ -9,19 +9,19 @@ import ( "github.com/andydunstall/kite" "github.com/andydunstall/pico/pkg/backoff" "github.com/andydunstall/pico/pkg/log" - "github.com/andydunstall/pico/server/netmap" + "github.com/andydunstall/pico/server/cluster" "go.uber.org/zap" ) -// Gossip is responsible for maintaining this nodes local NetworkMap +// Gossip is responsible for maintaining this nodes local State // and propagating the state of the local node to the rest of the cluster. // // At the gossip layer, a nodes state is represented as key-value pairs which // are propagated around the cluster using Kite. These key-value pairs are then // used to gossip based anti-entropy protocol. These key-value pairs are then -// used to build the local NetworkMap. +// used to build the local State. type Gossip struct { - networkMap *netmap.NetworkMap + clusterState *cluster.State // gossiper manages communicating with the other members to exchange state // updates. @@ -33,7 +33,7 @@ type Gossip struct { } func NewGossip( - networkMap *netmap.NetworkMap, + clusterState *cluster.State, streamLn net.Listener, packetLn net.PacketConn, conf Config, @@ -41,9 +41,9 @@ func NewGossip( ) (*Gossip, error) { logger = logger.WithSubsystem("gossip") - syncer := newSyncer(networkMap, logger) + syncer := newSyncer(clusterState, logger) gossiper, err := kite.New( - kite.WithNodeID(networkMap.LocalNode().ID), + kite.WithNodeID(clusterState.LocalNode().ID), kite.WithBindAddr(conf.BindAddr), kite.WithAdvertiseAddr(conf.AdvertiseAddr), kite.WithStreamListener(streamLn), @@ -57,10 +57,10 @@ func NewGossip( syncer.Sync(gossiper) return &Gossip{ - networkMap: networkMap, - gossiper: gossiper, - conf: conf, - logger: logger, + clusterState: clusterState, + gossiper: gossiper, + conf: conf, + logger: logger, }, nil } diff --git a/server/gossip/syncer.go b/server/gossip/syncer.go index 052774f..b99073b 100644 --- a/server/gossip/syncer.go +++ b/server/gossip/syncer.go @@ -7,7 +7,7 @@ import ( "github.com/andydunstall/kite" "github.com/andydunstall/pico/pkg/log" - "github.com/andydunstall/pico/server/netmap" + "github.com/andydunstall/pico/server/cluster" "go.uber.org/zap" ) @@ -16,31 +16,31 @@ type gossiper interface { DeleteLocal(key string) } -// syncer handles syncronising state between gossip and the netmap. +// syncer handles syncronising state between gossip and the cluster. // -// When a node joins, it is considered 'pending' so not added to the netmap +// When a node joins, it is considered 'pending' so not added to the cluster // until we have the full node state. Since gossip propagates state updates in -// order, we only add a node to the netmap when we have the required immutable +// order, we only add a node to the cluster when we have the required immutable // fields. type syncer struct { // pendingNodes contains nodes that we haven't received the full state for - // yet so can't be added to the netmap. - pendingNodes map[string]*netmap.Node + // yet so can't be added to the cluster. + pendingNodes map[string]*cluster.Node // mu protects the above fields. mu sync.Mutex - networkMap *netmap.NetworkMap + clusterState *cluster.State gossiper gossiper logger log.Logger } -func newSyncer(networkMap *netmap.NetworkMap, logger log.Logger) *syncer { +func newSyncer(clusterState *cluster.State, logger log.Logger) *syncer { return &syncer{ - pendingNodes: make(map[string]*netmap.Node), - networkMap: networkMap, + pendingNodes: make(map[string]*cluster.Node), + clusterState: clusterState, logger: logger, } } @@ -48,9 +48,9 @@ func newSyncer(networkMap *netmap.NetworkMap, logger log.Logger) *syncer { func (s *syncer) Sync(gossiper gossiper) { s.gossiper = gossiper - s.networkMap.OnLocalEndpointUpdate(s.onLocalEndpointUpdate) + s.clusterState.OnLocalEndpointUpdate(s.onLocalEndpointUpdate) - localNode := s.networkMap.LocalNode() + localNode := s.clusterState.LocalNode() // First add immutable fields. s.gossiper.UpsertLocal("proxy_addr", localNode.ProxyAddr) s.gossiper.UpsertLocal("admin_addr", localNode.AdminAddr) @@ -62,7 +62,7 @@ func (s *syncer) Sync(gossiper gossiper) { } func (s *syncer) OnJoin(nodeID string) { - if nodeID == s.networkMap.LocalID() { + if nodeID == s.clusterState.LocalID() { s.logger.Warn( "node joined; same id as local node", zap.String("node-id", nodeID), @@ -70,9 +70,9 @@ func (s *syncer) OnJoin(nodeID string) { return } - if _, ok := s.networkMap.Node(nodeID); ok { + if _, ok := s.clusterState.Node(nodeID); ok { s.logger.Warn( - "node joined; already in netmap", + "node joined; already in cluster", zap.String("node-id", nodeID), ) return @@ -90,8 +90,8 @@ func (s *syncer) OnJoin(nodeID string) { } // Add as pending since we don't have enough information to add to the - // netmap. - s.pendingNodes[nodeID] = &netmap.Node{ + // cluster. + s.pendingNodes[nodeID] = &cluster.Node{ ID: nodeID, } @@ -99,7 +99,7 @@ func (s *syncer) OnJoin(nodeID string) { } func (s *syncer) OnLeave(nodeID string) { - if nodeID == s.networkMap.LocalID() { + if nodeID == s.clusterState.LocalID() { s.logger.Warn( "node healthy; same id as local node", zap.String("node-id", nodeID), @@ -107,9 +107,9 @@ func (s *syncer) OnLeave(nodeID string) { return } - if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusLeft); updated { + if updated := s.clusterState.UpdateRemoteStatus(nodeID, cluster.NodeStatusLeft); updated { s.logger.Info( - "node leave; updated netmap", + "node leave; updated cluster", zap.String("node-id", nodeID), ) return @@ -136,7 +136,7 @@ func (s *syncer) OnLeave(nodeID string) { } func (s *syncer) OnUp(nodeID string) { - if nodeID == s.networkMap.LocalID() { + if nodeID == s.clusterState.LocalID() { s.logger.Warn( "node up; same id as local node", zap.String("node-id", nodeID), @@ -144,9 +144,9 @@ func (s *syncer) OnUp(nodeID string) { return } - if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusActive); updated { + if updated := s.clusterState.UpdateRemoteStatus(nodeID, cluster.NodeStatusActive); updated { s.logger.Info( - "node up; updated netmap", + "node up; updated cluster", zap.String("node-id", nodeID), ) return @@ -157,7 +157,7 @@ func (s *syncer) OnUp(nodeID string) { pending, ok := s.pendingNodes[nodeID] if ok { - pending.Status = netmap.NodeStatusActive + pending.Status = cluster.NodeStatusActive s.logger.Info( "node up; updated pending", @@ -172,7 +172,7 @@ func (s *syncer) OnUp(nodeID string) { } func (s *syncer) OnDown(nodeID string) { - if nodeID == s.networkMap.LocalID() { + if nodeID == s.clusterState.LocalID() { s.logger.Warn( "node down; same id as local node", zap.String("node-id", nodeID), @@ -180,9 +180,9 @@ func (s *syncer) OnDown(nodeID string) { return } - if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusDown); updated { + if updated := s.clusterState.UpdateRemoteStatus(nodeID, cluster.NodeStatusDown); updated { s.logger.Info( - "node down; updated netmap", + "node down; updated cluster", zap.String("node-id", nodeID), ) return @@ -195,7 +195,7 @@ func (s *syncer) OnDown(nodeID string) { // come back. pending, ok := s.pendingNodes[nodeID] if ok { - pending.Status = netmap.NodeStatusDown + pending.Status = cluster.NodeStatusDown s.logger.Info( "node down; updated pending", @@ -210,7 +210,7 @@ func (s *syncer) OnDown(nodeID string) { } func (s *syncer) OnExpired(nodeID string) { - if nodeID == s.networkMap.LocalID() { + if nodeID == s.clusterState.LocalID() { s.logger.Warn( "node expired; same id as local node", zap.String("node-id", nodeID), @@ -218,9 +218,9 @@ func (s *syncer) OnExpired(nodeID string) { return } - if removed := s.networkMap.RemoveNode(nodeID); removed { + if removed := s.clusterState.RemoveNode(nodeID); removed { s.logger.Info( - "node expired; removed from netmap", + "node expired; removed from cluster", zap.String("node-id", nodeID), ) return @@ -246,7 +246,7 @@ func (s *syncer) OnExpired(nodeID string) { } func (s *syncer) OnUpsertKey(nodeID, key, value string) { - if nodeID == s.networkMap.LocalID() { + if nodeID == s.clusterState.LocalID() { s.logger.Warn( "node upsert state; same id as local node", zap.String("node-id", nodeID), @@ -255,7 +255,7 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) { return } - // First check if the node is already in the netmap. Only check mutable + // First check if the node is already in the cluster. Only check mutable // fields. if strings.HasPrefix(key, "endpoint:") { endpointID, _ := strings.CutPrefix(key, "endpoint:") @@ -269,7 +269,7 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) { ) return } - if s.networkMap.UpdateRemoteEndpoint(nodeID, endpointID, listeners) { + if s.clusterState.UpdateRemoteEndpoint(nodeID, endpointID, listeners) { return } } @@ -315,19 +315,19 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) { return } - // Once we have the nodes immutable fields it can be added to the netmap. + // Once we have the nodes immutable fields it can be added to the cluster. if node.ProxyAddr != "" && node.AdminAddr != "" { if node.Status == "" { // Unless we've received a down/leave notification, we consider // the node as active. - node.Status = netmap.NodeStatusActive + node.Status = cluster.NodeStatusActive } delete(s.pendingNodes, node.ID) - s.networkMap.AddNode(node) + s.clusterState.AddNode(node) s.logger.Debug( - "node upsert state; added to netmap", + "node upsert state; added to cluster", zap.String("node-id", nodeID), zap.String("key", key), zap.String("value", value), @@ -343,7 +343,7 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) { } func (s *syncer) OnDeleteKey(nodeID, key string) { - if nodeID == s.networkMap.LocalID() { + if nodeID == s.clusterState.LocalID() { s.logger.Warn( "node delete state; same id as local node", zap.String("node-id", nodeID), @@ -363,9 +363,9 @@ func (s *syncer) OnDeleteKey(nodeID, key string) { } endpointID, _ := strings.CutPrefix(key, "endpoint:") - if s.networkMap.RemoveRemoteEndpoint(nodeID, endpointID) { + if s.clusterState.RemoveRemoteEndpoint(nodeID, endpointID) { s.logger.Debug( - "node delete state; netmap updated", + "node delete state; cluster updated", zap.String("node-id", nodeID), zap.String("key", key), ) diff --git a/server/gossip/syncer_test.go b/server/gossip/syncer_test.go index c465ece..77d9c2f 100644 --- a/server/gossip/syncer_test.go +++ b/server/gossip/syncer_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/andydunstall/pico/pkg/log" - "github.com/andydunstall/pico/server/netmap" + "github.com/andydunstall/pico/server/cluster" "github.com/stretchr/testify/assert" ) @@ -32,12 +32,12 @@ func (g *fakeGossiper) DeleteLocal(key string) { var _ gossiper = &fakeGossiper{} func TestSyncer_Sync(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) m.AddLocalEndpoint("my-endpoint") m.AddLocalEndpoint("my-endpoint") m.AddLocalEndpoint("my-endpoint") @@ -59,12 +59,12 @@ func TestSyncer_Sync(t *testing.T) { } func TestSyncer_OnLocalEndpointUpdate(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -102,12 +102,12 @@ func TestSyncer_OnLocalEndpointUpdate(t *testing.T) { func TestSyncer_RemoteNodeUpdate(t *testing.T) { t.Run("add node", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -121,9 +121,9 @@ func TestSyncer_RemoteNodeUpdate(t *testing.T) { node, ok := m.Node("remote") assert.True(t, ok) - assert.Equal(t, node, &netmap.Node{ + assert.Equal(t, node, &cluster.Node{ ID: "remote", - Status: netmap.NodeStatusActive, + Status: cluster.NodeStatusActive, ProxyAddr: "10.26.104.98:8000", AdminAddr: "10.26.104.98:8001", Endpoints: map[string]int{ @@ -133,12 +133,12 @@ func TestSyncer_RemoteNodeUpdate(t *testing.T) { }) t.Run("add node missing state", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -154,13 +154,13 @@ func TestSyncer_RemoteNodeUpdate(t *testing.T) { }) t.Run("add local node", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", - Status: netmap.NodeStatusActive, + Status: cluster.NodeStatusActive, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -176,13 +176,13 @@ func TestSyncer_RemoteNodeUpdate(t *testing.T) { }) t.Run("update node", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", - Status: netmap.NodeStatusActive, + Status: cluster.NodeStatusActive, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -202,9 +202,9 @@ func TestSyncer_RemoteNodeUpdate(t *testing.T) { node, ok := m.Node("remote") assert.True(t, ok) - assert.Equal(t, node, &netmap.Node{ + assert.Equal(t, node, &cluster.Node{ ID: "remote", - Status: netmap.NodeStatusActive, + Status: cluster.NodeStatusActive, ProxyAddr: "10.26.104.98:8000", AdminAddr: "10.26.104.98:8001", Endpoints: map[string]int{ @@ -216,12 +216,12 @@ func TestSyncer_RemoteNodeUpdate(t *testing.T) { func TestSyncer_RemoteNodeLeave(t *testing.T) { t.Run("active node leave", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -234,14 +234,14 @@ func TestSyncer_RemoteNodeLeave(t *testing.T) { sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") - // Leaving should update the netmap. + // Leaving should update the cluster. sync.OnLeave("remote") node, ok := m.Node("remote") assert.True(t, ok) - assert.Equal(t, node, &netmap.Node{ + assert.Equal(t, node, &cluster.Node{ ID: "remote", - Status: netmap.NodeStatusLeft, + Status: cluster.NodeStatusLeft, ProxyAddr: "10.26.104.98:8000", AdminAddr: "10.26.104.98:8001", Endpoints: map[string]int{ @@ -251,18 +251,18 @@ func TestSyncer_RemoteNodeLeave(t *testing.T) { sync.OnExpired("remote") - // Expiring should remove from the netmap. + // Expiring should remove from the cluster. _, ok = m.Node("remote") assert.False(t, ok) }) t.Run("pending node leave", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -283,13 +283,13 @@ func TestSyncer_RemoteNodeLeave(t *testing.T) { }) t.Run("local node leave", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", - Status: netmap.NodeStatusActive, + Status: cluster.NodeStatusActive, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -305,12 +305,12 @@ func TestSyncer_RemoteNodeLeave(t *testing.T) { func TestSyncer_RemoteNodeDown(t *testing.T) { t.Run("active node", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -323,14 +323,14 @@ func TestSyncer_RemoteNodeDown(t *testing.T) { sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") - // Marking a node down should update the netmap. + // Marking a node down should update the cluster. sync.OnDown("remote") node, ok := m.Node("remote") assert.True(t, ok) - assert.Equal(t, node, &netmap.Node{ + assert.Equal(t, node, &cluster.Node{ ID: "remote", - Status: netmap.NodeStatusDown, + Status: cluster.NodeStatusDown, ProxyAddr: "10.26.104.98:8000", AdminAddr: "10.26.104.98:8001", Endpoints: map[string]int{ @@ -340,18 +340,18 @@ func TestSyncer_RemoteNodeDown(t *testing.T) { sync.OnExpired("remote") - // Expiring should remove from the netmap. + // Expiring should remove from the cluster. _, ok = m.Node("remote") assert.False(t, ok) }) t.Run("active node recovers", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -364,17 +364,17 @@ func TestSyncer_RemoteNodeDown(t *testing.T) { sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") - // Marking a node down should update the netmap. + // Marking a node down should update the cluster. sync.OnDown("remote") - // Marking a node healthy should update the netmap. + // Marking a node healthy should update the cluster. sync.OnUp("remote") node, ok := m.Node("remote") assert.True(t, ok) - assert.Equal(t, node, &netmap.Node{ + assert.Equal(t, node, &cluster.Node{ ID: "remote", - Status: netmap.NodeStatusActive, + Status: cluster.NodeStatusActive, ProxyAddr: "10.26.104.98:8000", AdminAddr: "10.26.104.98:8001", Endpoints: map[string]int{ @@ -384,12 +384,12 @@ func TestSyncer_RemoteNodeDown(t *testing.T) { }) t.Run("pending node down", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -408,21 +408,21 @@ func TestSyncer_RemoteNodeDown(t *testing.T) { node, ok := m.Node("remote") assert.True(t, ok) - assert.Equal(t, node, &netmap.Node{ + assert.Equal(t, node, &cluster.Node{ ID: "remote", - Status: netmap.NodeStatusActive, + Status: cluster.NodeStatusActive, ProxyAddr: "10.26.104.98:8000", AdminAddr: "10.26.104.98:8001", }) }) t.Run("pending node expires", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) @@ -444,13 +444,13 @@ func TestSyncer_RemoteNodeDown(t *testing.T) { }) t.Run("local node leave", func(t *testing.T) { - localNode := &netmap.Node{ + localNode := &cluster.Node{ ID: "local", - Status: netmap.NodeStatusActive, + Status: cluster.NodeStatusActive, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + m := cluster.NewState(localNode.Copy(), log.NewNopLogger()) sync := newSyncer(m, log.NewNopLogger()) diff --git a/server/netmap/netmap.go b/server/netmap/netmap.go deleted file mode 100644 index 54b042b..0000000 --- a/server/netmap/netmap.go +++ /dev/null @@ -1,305 +0,0 @@ -package netmap - -import ( - "sync" - - "github.com/andydunstall/pico/pkg/log" - "github.com/prometheus/client_golang/prometheus" -) - -// NetworkMap represents the known state of the cluster as seen by the local -// node. -// -// This map is eventually consistent. The state is propagated among the nodes -// in the cluster using gossip. -type NetworkMap struct { - localID string - nodes map[string]*Node - - localEndpointSubscribers []func(endpointID string, listeners int) - - // mu protects the above fields. - mu sync.RWMutex - - metrics *Metrics - - logger log.Logger -} - -func NewNetworkMap( - localNode *Node, - logger log.Logger, -) *NetworkMap { - // The local node is always active. - localNode.Status = NodeStatusActive - nodes := make(map[string]*Node) - nodes[localNode.ID] = localNode - - m := &NetworkMap{ - localID: localNode.ID, - nodes: nodes, - metrics: NewMetrics(), - logger: logger.WithSubsystem("netmap"), - } - m.addMetricsEntry(localNode.Status) - return m -} - -// Node returns the known state of the node with the given ID, or false if the -// node is unknown. -func (m *NetworkMap) Node(id string) (*Node, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - - node, ok := m.nodes[id] - if !ok { - return nil, false - } - return node.Copy(), true -} - -// LocalID returns the ID of the local node. -func (m *NetworkMap) LocalID() string { - // localID is immutable so don't need a mutex. - return m.localID -} - -// LocalNode returns the state of the local node. -func (m *NetworkMap) LocalNode() *Node { - m.mu.RLock() - defer m.mu.RUnlock() - - node, ok := m.nodes[m.localID] - if !ok { - panic("local node not in netmap") - } - return node.Copy() -} - -// Nodes returns the state of the known nodes. -func (m *NetworkMap) Nodes() []*Node { - m.mu.RLock() - defer m.mu.RUnlock() - - nodes := make([]*Node, 0, len(m.nodes)) - for _, node := range m.nodes { - nodes = append(nodes, node.Copy()) - } - return nodes -} - -func (m *NetworkMap) LookupEndpoint(endpointID string) (*Node, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - - for _, node := range m.nodes { - if node.ID == m.localID { - // Ignore ourselves. - continue - } - if listeners, ok := node.Endpoints[endpointID]; ok && listeners > 0 { - return node.Copy(), true - } - } - - return nil, false -} - -// AddLocalEndpoint adds the active endpoint to the local node state. -func (m *NetworkMap) AddLocalEndpoint(endpointID string) { - m.mu.Lock() - defer m.mu.Unlock() - - node, ok := m.nodes[m.localID] - if !ok { - panic("local node not in netmap") - } - - if node.Endpoints == nil { - node.Endpoints = make(map[string]int) - } - - node.Endpoints[endpointID] = node.Endpoints[endpointID] + 1 - - for _, f := range m.localEndpointSubscribers { - f(endpointID, node.Endpoints[endpointID]) - } -} - -// RemoveLocalEndpoint removes the active endpoint from the local node state. -func (m *NetworkMap) RemoveLocalEndpoint(endpointID string) { - m.mu.Lock() - defer m.mu.Unlock() - - node, ok := m.nodes[m.localID] - if !ok { - panic("local node not in netmap") - } - - if node.Endpoints == nil { - node.Endpoints = make(map[string]int) - } - - listeners, ok := node.Endpoints[endpointID] - if !ok || listeners == 0 { - m.logger.Warn("remove local endpoint: endpoint not found") - return - } - - if listeners > 1 { - node.Endpoints[endpointID] = listeners - 1 - } else { - delete(node.Endpoints, endpointID) - } - - for _, f := range m.localEndpointSubscribers { - f(endpointID, node.Endpoints[endpointID]) - } -} - -// OnLocalEndpointUpdate subscribes to changes to the local nodes active -// endpoints. -// -// The callback is called with the netmap mutex locked so must not block or -// call back to the netmap. -func (m *NetworkMap) OnLocalEndpointUpdate(f func(endpointID string, listeners int)) { - m.mu.Lock() - defer m.mu.Unlock() - - m.localEndpointSubscribers = append(m.localEndpointSubscribers, f) -} - -// AddNode adds the given node to the netmap. -func (m *NetworkMap) AddNode(node *Node) { - m.mu.Lock() - defer m.mu.Unlock() - - if node.ID == m.localID { - m.logger.Warn("add node: cannot add local node") - return - } - - if _, ok := m.nodes[node.ID]; ok { - // If already in the netmap update the node but warn as this should - // not happen. - m.logger.Warn("add node: node already in netmap") - } - - m.nodes[node.ID] = node - m.addMetricsEntry(node.Status) -} - -// RemoveNode removes the node with the given ID from the netmap. -func (m *NetworkMap) RemoveNode(id string) bool { - m.mu.Lock() - defer m.mu.Unlock() - - if id == m.localID { - m.logger.Warn("remove node: cannot remove local node") - return false - } - - node, ok := m.nodes[id] - if !ok { - m.logger.Warn("remove node: node not in netmap") - return false - } - - delete(m.nodes, id) - m.removeMetricsEntry(node.Status) - - return true -} - -// UpdateRemoteStatus sets the status of the remote node with the given ID. -func (m *NetworkMap) UpdateRemoteStatus(id string, status NodeStatus) bool { - m.mu.Lock() - defer m.mu.Unlock() - - if id == m.localID { - m.logger.Warn("update remote status: cannot update local node") - return false - } - - n, ok := m.nodes[id] - if !ok { - m.logger.Warn("update remote status: node not in netmap") - return false - } - - oldStatus := n.Status - n.Status = status - m.updateMetricsEntry(oldStatus, status) - return true -} - -// UpdateRemoteEndpoint sets the number of listeners for the active endpoint -// for the node with the given ID. -func (m *NetworkMap) UpdateRemoteEndpoint( - id string, - endpointID string, - listeners int, -) bool { - m.mu.Lock() - defer m.mu.Unlock() - - if id == m.localID { - m.logger.Warn("update remote endpoint: cannot update local node") - return false - } - - n, ok := m.nodes[id] - if !ok { - m.logger.Warn("update remote endpoint: node not in netmap") - return false - } - - if n.Endpoints == nil { - n.Endpoints = make(map[string]int) - } - - n.Endpoints[endpointID] = listeners - - return true -} - -// RemoveRemoteEndpoint removes the active endpoint from the node with the -// given ID. -func (m *NetworkMap) RemoveRemoteEndpoint(id string, endpointID string) bool { - m.mu.Lock() - defer m.mu.Unlock() - - if id == m.localID { - m.logger.Warn("remove remote endpoint: cannot update local node") - return false - } - - n, ok := m.nodes[id] - if !ok { - m.logger.Warn("remove remote endpoint: node not in netmap") - return false - } - - if n.Endpoints != nil { - delete(n.Endpoints, endpointID) - } - - return true -} - -func (m *NetworkMap) Metrics() *Metrics { - return m.metrics -} - -func (m *NetworkMap) updateMetricsEntry(oldStatus NodeStatus, newStatus NodeStatus) { - m.removeMetricsEntry(oldStatus) - m.addMetricsEntry(newStatus) -} - -func (m *NetworkMap) addMetricsEntry(s NodeStatus) { - m.metrics.Entries.With(prometheus.Labels{"status": string(s)}).Inc() -} - -func (m *NetworkMap) removeMetricsEntry(s NodeStatus) { - m.metrics.Entries.With(prometheus.Labels{"status": string(s)}).Dec() -} diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 252a860..c4ba543 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -11,7 +11,7 @@ import ( "time" "github.com/andydunstall/pico/pkg/log" - "github.com/andydunstall/pico/server/netmap" + "github.com/andydunstall/pico/server/cluster" "go.uber.org/zap" ) @@ -27,7 +27,7 @@ type Proxy struct { logger log.Logger } -func NewProxy(networkMap *netmap.NetworkMap, opts ...Option) *Proxy { +func NewProxy(clusterState *cluster.State, opts ...Option) *Proxy { options := defaultOptions() for _, opt := range opts { opt.apply(&options) @@ -36,7 +36,7 @@ func NewProxy(networkMap *netmap.NetworkMap, opts ...Option) *Proxy { logger := options.logger.WithSubsystem("proxy") return &Proxy{ local: newLocalProxy(logger), - remote: newRemoteProxy(networkMap, options.forwarder, logger), + remote: newRemoteProxy(clusterState, options.forwarder, logger), logger: logger, } } diff --git a/server/proxy/proxy_test.go b/server/proxy/proxy_test.go index 4bcaf50..44d92e4 100644 --- a/server/proxy/proxy_test.go +++ b/server/proxy/proxy_test.go @@ -9,7 +9,7 @@ import ( "testing" "github.com/andydunstall/pico/pkg/log" - "github.com/andydunstall/pico/server/netmap" + "github.com/andydunstall/pico/server/cluster" "github.com/stretchr/testify/assert" ) @@ -48,8 +48,8 @@ func (f *fakeForwarder) Request( func TestProxy(t *testing.T) { t.Run("forward request remote ok", func(t *testing.T) { - networkMap := netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()) - networkMap.AddNode(&netmap.Node{ + networkMap := cluster.NewState(&cluster.Node{}, log.NewNopLogger()) + networkMap.AddNode(&cluster.Node{ ID: "node-1", ProxyAddr: "1.2.3.4:1234", Endpoints: map[string]int{ @@ -82,8 +82,8 @@ func TestProxy(t *testing.T) { }) t.Run("forward request remote endpoint timeout", func(t *testing.T) { - networkMap := netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()) - networkMap.AddNode(&netmap.Node{ + networkMap := cluster.NewState(&cluster.Node{}, log.NewNopLogger()) + networkMap.AddNode(&cluster.Node{ ID: "node-1", ProxyAddr: "1.2.3.4:1234", Endpoints: map[string]int{ @@ -116,8 +116,8 @@ func TestProxy(t *testing.T) { }) t.Run("forward request remote endpoint unreachable", func(t *testing.T) { - networkMap := netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()) - networkMap.AddNode(&netmap.Node{ + networkMap := cluster.NewState(&cluster.Node{}, log.NewNopLogger()) + networkMap.AddNode(&cluster.Node{ ID: "node-1", ProxyAddr: "1.2.3.4:1234", Endpoints: map[string]int{ @@ -150,7 +150,7 @@ func TestProxy(t *testing.T) { t.Run("forward request remote endpoint not found", func(t *testing.T) { proxy := NewProxy( - netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()), + cluster.NewState(&cluster.Node{}, log.NewNopLogger()), ) header := make(http.Header) @@ -182,7 +182,7 @@ func TestProxy(t *testing.T) { } proxy := NewProxy( - netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()), + cluster.NewState(&cluster.Node{}, log.NewNopLogger()), ) proxy.AddConn(conn) @@ -207,7 +207,7 @@ func TestProxy(t *testing.T) { } proxy := NewProxy( - netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()), + cluster.NewState(&cluster.Node{}, log.NewNopLogger()), ) proxy.AddConn(conn) @@ -236,7 +236,7 @@ func TestProxy(t *testing.T) { } proxy := NewProxy( - netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()), + cluster.NewState(&cluster.Node{}, log.NewNopLogger()), ) proxy.AddConn(conn) @@ -256,7 +256,7 @@ func TestProxy(t *testing.T) { t.Run("forward request local endpoint not found", func(t *testing.T) { proxy := NewProxy( - netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()), + cluster.NewState(&cluster.Node{}, log.NewNopLogger()), ) header := make(http.Header) @@ -289,8 +289,8 @@ func TestProxy(t *testing.T) { }) t.Run("add conn", func(t *testing.T) { - networkMap := netmap.NewNetworkMap( - &netmap.Node{ + networkMap := cluster.NewState( + &cluster.Node{ ID: "local", }, log.NewNopLogger(), ) @@ -300,7 +300,7 @@ func TestProxy(t *testing.T) { endpointID: "my-endpoint", } proxy.AddConn(conn) - // Verify the netmap was updated. + // Verify the cluster was updated. assert.Equal(t, map[string]int{ "my-endpoint": 1, }, networkMap.LocalNode().Endpoints) @@ -311,7 +311,7 @@ func TestProxy(t *testing.T) { t.Run("missing endpoint", func(t *testing.T) { proxy := NewProxy( - netmap.NewNetworkMap(&netmap.Node{}, log.NewNopLogger()), + cluster.NewState(&cluster.Node{}, log.NewNopLogger()), ) resp := proxy.Request(context.TODO(), &http.Request{ diff --git a/server/proxy/remote.go b/server/proxy/remote.go index 13b7957..8100563 100644 --- a/server/proxy/remote.go +++ b/server/proxy/remote.go @@ -6,13 +6,13 @@ import ( "github.com/andydunstall/pico/pkg/forwarder" "github.com/andydunstall/pico/pkg/log" - "github.com/andydunstall/pico/server/netmap" + "github.com/andydunstall/pico/server/cluster" ) // remoteProxy is responsible for forwarding requests to Pico server nodes with // an upstream connection for the target endpoint. type remoteProxy struct { - networkMap *netmap.NetworkMap + clusterState *cluster.State forwarder forwarder.Forwarder @@ -20,14 +20,14 @@ type remoteProxy struct { } func newRemoteProxy( - networkMap *netmap.NetworkMap, + clusterState *cluster.State, forwarder forwarder.Forwarder, logger log.Logger, ) *remoteProxy { return &remoteProxy{ - networkMap: networkMap, - forwarder: forwarder, - logger: logger, + clusterState: clusterState, + forwarder: forwarder, + logger: logger, } } @@ -44,13 +44,13 @@ func (p *remoteProxy) Request( } func (p *remoteProxy) AddConn(conn Conn) { - // Update the netmap to notify other nodes that we have a connection for + // Update the cluster to notify other nodes that we have a connection for // the endpoint. - p.networkMap.AddLocalEndpoint(conn.EndpointID()) + p.clusterState.AddLocalEndpoint(conn.EndpointID()) } func (p *remoteProxy) RemoveConn(conn Conn) { - p.networkMap.RemoveLocalEndpoint(conn.EndpointID()) + p.clusterState.RemoveLocalEndpoint(conn.EndpointID()) } // findNode looks up a node with an upstream connection for the given endpoint @@ -58,7 +58,7 @@ func (p *remoteProxy) RemoveConn(conn Conn) { func (p *remoteProxy) findNode(endpointID string) (string, bool) { // TODO(andydunstall): This doesn't yet do any load balancing. It just // selects the first node. - node, ok := p.networkMap.LookupEndpoint(endpointID) + node, ok := p.clusterState.LookupEndpoint(endpointID) if !ok { return "", false } diff --git a/server/server.go b/server/server.go index d3eba1e..c221a8a 100644 --- a/server/server.go +++ b/server/server.go @@ -9,9 +9,9 @@ import ( "github.com/andydunstall/pico/pkg/log" "github.com/andydunstall/pico/server/auth" + "github.com/andydunstall/pico/server/cluster" "github.com/andydunstall/pico/server/config" "github.com/andydunstall/pico/server/gossip" - "github.com/andydunstall/pico/server/netmap" "github.com/andydunstall/pico/server/proxy" adminserver "github.com/andydunstall/pico/server/server/admin" proxyserver "github.com/andydunstall/pico/server/server/proxy" @@ -67,7 +67,7 @@ func NewServer(conf *config.Config, logger log.Logger) (*Server, error) { } if conf.Cluster.NodeID == "" { - nodeID := netmap.GenerateNodeID() + nodeID := cluster.GenerateNodeID() if conf.Cluster.NodeIDPrefix != "" { nodeID = conf.Cluster.NodeIDPrefix + nodeID } @@ -165,16 +165,16 @@ func (s *Server) Run(ctx context.Context) error { s.logger, ) - networkMap := netmap.NewNetworkMap(&netmap.Node{ + clusterState := cluster.NewState(&cluster.Node{ ID: s.conf.Cluster.NodeID, ProxyAddr: s.conf.Proxy.AdvertiseAddr, AdminAddr: s.conf.Admin.AdvertiseAddr, }, s.logger) - networkMap.Metrics().Register(registry) - adminServer.AddStatus("/netmap", netmap.NewStatus(networkMap)) + clusterState.Metrics().Register(registry) + adminServer.AddStatus("/cluster", cluster.NewStatus(clusterState)) gossiper, err := gossip.NewGossip( - networkMap, + clusterState, s.gossipStreamLn, s.gossipPacketLn, s.conf.Gossip, @@ -203,7 +203,7 @@ func (s *Server) Run(ctx context.Context) error { ) } - p := proxy.NewProxy(networkMap, proxy.WithLogger(s.logger)) + p := proxy.NewProxy(clusterState, proxy.WithLogger(s.logger)) adminServer.AddStatus("/proxy", proxy.NewStatus(p)) proxyServer := proxyserver.NewServer( diff --git a/status/client/client.go b/status/client/client.go index 1d43f6e..db0403e 100644 --- a/status/client/client.go +++ b/status/client/client.go @@ -10,7 +10,7 @@ import ( "time" "github.com/andydunstall/kite" - "github.com/andydunstall/pico/server/netmap" + "github.com/andydunstall/pico/server/cluster" ) type Client struct { @@ -42,28 +42,28 @@ func (c *Client) ProxyEndpoints() (map[string][]string, error) { return endpoints, nil } -func (c *Client) NetmapNodes() ([]*netmap.Node, error) { - r, err := c.request("/status/netmap/nodes") +func (c *Client) ClusterNodes() ([]*cluster.Node, error) { + r, err := c.request("/status/cluster/nodes") if err != nil { return nil, err } defer r.Close() - var nodes []*netmap.Node + var nodes []*cluster.Node if err := json.NewDecoder(r).Decode(&nodes); err != nil { return nil, fmt.Errorf("decode response: %w", err) } return nodes, nil } -func (c *Client) NetmapNode(nodeID string) (*netmap.Node, error) { - r, err := c.request("/status/netmap/nodes/" + nodeID) +func (c *Client) ClusterNode(nodeID string) (*cluster.Node, error) { + r, err := c.request("/status/cluster/nodes/" + nodeID) if err != nil { return nil, err } defer r.Close() - var node netmap.Node + var node cluster.Node if err := json.NewDecoder(r).Decode(&node); err != nil { return nil, fmt.Errorf("decode response: %w", err) } diff --git a/tests/cluster_test.go b/tests/cluster_test.go index d6fea47..d4c4da4 100644 --- a/tests/cluster_test.go +++ b/tests/cluster_test.go @@ -54,7 +54,7 @@ func TestCluster(t *testing.T) { Scheme: "http", Host: addr, }) - nodes, err := statusClient.NetmapNodes() + nodes, err := statusClient.ClusterNodes() assert.NoError(t, err) if len(nodes) < 2 {