diff --git a/Makefile b/Makefile index 1bd6666..92c8623 100644 --- a/Makefile +++ b/Makefile @@ -18,3 +18,8 @@ lint: .PHONY: generate generate: protoc --go_out=. --go_opt=paths=source_relative api/rpc.proto + +.PHONY: coverage +coverage: + go test ./... -coverprofile=coverage.out + go tool cover -html=coverage.out diff --git a/cli/server/command.go b/cli/server/command.go index 10f5565..f50c6c7 100644 --- a/cli/server/command.go +++ b/cli/server/command.go @@ -448,7 +448,6 @@ func runv2(conf *config.Config, logger log.Logger) error { networkMap := netmapv2.NewNetworkMap(&netmapv2.Node{ ID: conf.Cluster.NodeID, - Status: netmapv2.NodeStatusActive, ProxyAddr: conf.Proxy.AdvertiseAddr, AdminAddr: conf.Admin.AdvertiseAddr, }, logger) @@ -488,8 +487,6 @@ func runv2(conf *config.Config, logger log.Logger) error { zap.String("signal", sig.String()), ) - networkMap.UpdateLocalStatus(netmapv2.NodeStatusLeft) - // Leave as soon as we receive the shutdown signal to avoid receiving // forward proxy requests. if err := gossip.Leave(); err != nil { diff --git a/go.mod b/go.mod index 22acb78..0bc879d 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/andydunstall/pico go 1.21.1 require ( - github.com/andydunstall/kite v0.0.0-20240426052643-5e213b67e3ac + github.com/andydunstall/kite v0.0.0-20240427082845-91285793db0f github.com/gin-gonic/gin v1.9.1 github.com/goccy/go-yaml v1.11.3 github.com/gorilla/websocket v1.5.1 diff --git a/go.sum b/go.sum index df63096..c9e401f 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/andydunstall/kite v0.0.0-20240426052643-5e213b67e3ac h1:N8JXYaQmKcOUF943KzmOIwKCw+uu+WL7yvNbA+wjZZc= -github.com/andydunstall/kite v0.0.0-20240426052643-5e213b67e3ac/go.mod h1:7Bn7WGcSRLvX5HQySLzcGl9dJD1m9xoYNZtx5IIkx1I= +github.com/andydunstall/kite v0.0.0-20240427082845-91285793db0f h1:VFrRSjIOVtFHY9fe7Sqji747Bn5Lbe3r1u5ydPnYAtI= +github.com/andydunstall/kite v0.0.0-20240427082845-91285793db0f/go.mod h1:7Bn7WGcSRLvX5HQySLzcGl9dJD1m9xoYNZtx5IIkx1I= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bytedance/sonic v1.11.5 h1:G00FYjjqll5iQ1PYXynbg/hyzqBqavH8Mo9/oTopd9k= diff --git a/server/gossip/gossip.go b/server/gossip/gossip.go index a0e8ab6..b23aa48 100644 --- a/server/gossip/gossip.go +++ b/server/gossip/gossip.go @@ -333,6 +333,10 @@ func (w *kiteWatcher) OnHealthy(_ string) { // TODO(andydunstall) } +func (w *kiteWatcher) OnExpired(_ string) { + // TODO(andydunstall) +} + func (w *kiteWatcher) OnDown(memberID string) { w.gossip.onRemoteDown(memberID) } diff --git a/server/gossipv2/syncer.go b/server/gossipv2/syncer.go index 65ad0c7..b8c3cfd 100644 --- a/server/gossipv2/syncer.go +++ b/server/gossipv2/syncer.go @@ -20,8 +20,8 @@ type gossiper interface { // // When a node joins, it is considered 'pending' so not added to the netmap // until we have the full node state. Since gossip propagates state updates in -// order, we always send the initial node state status last, meaning if we -// have the status of a remote node, we'll have its other immutable fields. +// order, we only add a node to the netmap when we have the required immutable +// fields. type syncer struct { // pendingNodes contains nodes that we haven't received the full state for // yet so can't be added to the netmap. @@ -48,16 +48,12 @@ func newSyncer(networkMap *netmap.NetworkMap, logger log.Logger) *syncer { func (s *syncer) Sync(gossiper gossiper) { s.gossiper = gossiper - s.networkMap.OnLocalStatusUpdate(s.onLocalStatusUpdate) s.networkMap.OnLocalEndpointUpdate(s.onLocalEndpointUpdate) localNode := s.networkMap.LocalNode() // First add immutable fields. s.gossiper.UpsertLocal("proxy_addr", localNode.ProxyAddr) s.gossiper.UpsertLocal("admin_addr", localNode.AdminAddr) - // Next add status, which means receiving nodes will consider the node - // state 'complete' so add to the netmap. - s.gossiper.UpsertLocal("status", string(localNode.Status)) // Finally add mutable fields. for endpointID, listeners := range localNode.Endpoints { key := "endpoint:" + endpointID @@ -103,9 +99,17 @@ func (s *syncer) OnJoin(nodeID string) { } func (s *syncer) OnLeave(nodeID string) { - if deleted := s.networkMap.RemoveNode(nodeID); deleted { + if nodeID == s.networkMap.LocalID() { + s.logger.Warn( + "node healthy; same id as local node", + zap.String("node-id", nodeID), + ) + return + } + + if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusLeft); updated { s.logger.Info( - "node leave; removed from netmap", + "node leave; updated netmap", zap.String("node-id", nodeID), ) return @@ -114,6 +118,7 @@ func (s *syncer) OnLeave(nodeID string) { s.mu.Lock() defer s.mu.Unlock() + // If a pending node has left it can be discarded. _, ok := s.pendingNodes[nodeID] if ok { delete(s.pendingNodes, nodeID) @@ -130,17 +135,114 @@ func (s *syncer) OnLeave(nodeID string) { } } -func (s *syncer) OnHealthy(_ string) { - // TODO(andydunstall): - // If a node goes down then comes back, need to ensure we still have the - // state of that node. Therefore if a node is down, mark it as down in the - // netmap but don't remove. Wait for Kite to send OnLeave before actually - // removing from the netmap (since we know Kite will do an OnJoin if it - // comes back). Therefore need to update Kite. +func (s *syncer) OnHealthy(nodeID string) { + if nodeID == s.networkMap.LocalID() { + s.logger.Warn( + "node healthy; same id as local node", + zap.String("node-id", nodeID), + ) + return + } + + if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusActive); updated { + s.logger.Info( + "node helathy; updated netmap", + zap.String("node-id", nodeID), + ) + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + pending, ok := s.pendingNodes[nodeID] + if ok { + pending.Status = netmap.NodeStatusActive + + s.logger.Info( + "node healthy; updated pending", + zap.String("node-id", nodeID), + ) + } else { + s.logger.Warn( + "node healthy; unknown node", + zap.String("node-id", nodeID), + ) + } +} + +func (s *syncer) OnDown(nodeID string) { + if nodeID == s.networkMap.LocalID() { + s.logger.Warn( + "node down; same id as local node", + zap.String("node-id", nodeID), + ) + return + } + + if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusDown); updated { + s.logger.Info( + "node down; updated netmap", + zap.String("node-id", nodeID), + ) + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + // Update pending status. We must still retain the pending node as it may + // come back. + pending, ok := s.pendingNodes[nodeID] + if ok { + pending.Status = netmap.NodeStatusDown + + s.logger.Info( + "node down; updated pending", + zap.String("node-id", nodeID), + ) + } else { + s.logger.Warn( + "node down; unknown node", + zap.String("node-id", nodeID), + ) + } } -func (s *syncer) OnDown(_ string) { - // TODO(andydunstall): See above. +func (s *syncer) OnExpired(nodeID string) { + if nodeID == s.networkMap.LocalID() { + s.logger.Warn( + "node expired; same id as local node", + zap.String("node-id", nodeID), + ) + return + } + + if removed := s.networkMap.RemoveNode(nodeID); removed { + s.logger.Info( + "node expired; removed from netmap", + zap.String("node-id", nodeID), + ) + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + _, ok := s.pendingNodes[nodeID] + if ok { + delete(s.pendingNodes, nodeID) + + s.logger.Info( + "node expired; removed from pending", + zap.String("node-id", nodeID), + ) + } else { + s.logger.Warn( + "node expired; unknown node", + zap.String("node-id", nodeID), + ) + } } func (s *syncer) OnUpsertKey(nodeID, key, value string) { @@ -155,11 +257,7 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) { // First check if the node is already in the netmap. Only check mutable // fields. - if key == "status" { - if s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatus(value)) { - return - } - } else if strings.HasPrefix(key, "endpoint:") { + if strings.HasPrefix(key, "endpoint:") { endpointID, _ := strings.CutPrefix(key, "endpoint:") listeners, err := strconv.Atoi(value) if err != nil { @@ -188,10 +286,6 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) { zap.String("value", value), ) return - } - - if key == "status" { - node.Status = netmap.NodeStatus(value) } else if key == "proxy_addr" { node.ProxyAddr = value } else if key == "admin_addr" { @@ -221,9 +315,14 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) { return } - // Once we have the node status for the pending node, it can be added to - // the netmap. - if node.Status != "" { + // Once we have the nodes immutable fields it can be added to the netmap. + if node.ProxyAddr != "" && node.AdminAddr != "" { + if node.Status == "" { + // Unless we've received a down/leave notification, we consider + // the node as active. + node.Status = netmap.NodeStatusActive + } + delete(s.pendingNodes, node.ID) s.networkMap.AddNode(node) @@ -297,10 +396,6 @@ func (s *syncer) OnDeleteKey(nodeID, key string) { ) } -func (s *syncer) onLocalStatusUpdate(status netmap.NodeStatus) { - s.gossiper.UpsertLocal("status", string(status)) -} - func (s *syncer) onLocalEndpointUpdate(endpointID string, listeners int) { key := "endpoint:" + endpointID if listeners > 0 { diff --git a/server/gossipv2/syncer_test.go b/server/gossipv2/syncer_test.go index e79ca22..f0389e4 100644 --- a/server/gossipv2/syncer_test.go +++ b/server/gossipv2/syncer_test.go @@ -34,7 +34,6 @@ var _ gossiper = &fakeGossiper{} func TestSyncer_Sync(t *testing.T) { localNode := &netmap.Node{ ID: "local", - Status: netmap.NodeStatusActive, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } @@ -53,40 +52,15 @@ func TestSyncer_Sync(t *testing.T) { []upsert{ {"proxy_addr", "10.26.104.56:8000"}, {"admin_addr", "10.26.104.56:8001"}, - {"status", "active"}, {"endpoint:my-endpoint", "3"}, }, gossiper.upserts, ) } -func TestSyncer_OnLocalStatusUpdate(t *testing.T) { - localNode := &netmap.Node{ - ID: "local", - Status: netmap.NodeStatusJoining, - ProxyAddr: "10.26.104.56:8000", - AdminAddr: "10.26.104.56:8001", - } - m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) - - sync := newSyncer(m, log.NewNopLogger()) - - gossiper := &fakeGossiper{} - sync.Sync(gossiper) - - m.UpdateLocalStatus(netmap.NodeStatusActive) - - assert.Equal( - t, - upsert{"status", "active"}, - gossiper.upserts[len(gossiper.upserts)-1], - ) -} - func TestSyncer_OnLocalEndpointUpdate(t *testing.T) { localNode := &netmap.Node{ ID: "local", - Status: netmap.NodeStatusJoining, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } @@ -126,11 +100,10 @@ func TestSyncer_OnLocalEndpointUpdate(t *testing.T) { ) } -func TestSyncer_UpdateRemoteNode(t *testing.T) { +func TestSyncer_RemoteNodeUpdate(t *testing.T) { t.Run("add node", func(t *testing.T) { localNode := &netmap.Node{ ID: "local", - Status: netmap.NodeStatusJoining, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } @@ -145,7 +118,6 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") - sync.OnUpsertKey("remote", "status", string(netmap.NodeStatusActive)) node, ok := m.Node("remote") assert.True(t, ok) @@ -160,10 +132,9 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { }) }) - t.Run("add node missing status", func(t *testing.T) { + t.Run("add node missing state", func(t *testing.T) { localNode := &netmap.Node{ ID: "local", - Status: netmap.NodeStatusJoining, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } @@ -176,8 +147,6 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { sync.OnJoin("remote") sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") - sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") - sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") // We don't have the node status therefore it is still pending. _, ok := m.Node("remote") @@ -187,7 +156,7 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { t.Run("add local node", func(t *testing.T) { localNode := &netmap.Node{ ID: "local", - Status: netmap.NodeStatusJoining, + Status: netmap.NodeStatusActive, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } @@ -202,7 +171,6 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { sync.OnJoin("local") sync.OnUpsertKey("local", "proxy_addr", "10.26.104.98:8000") sync.OnUpsertKey("local", "admin_addr", "10.26.104.98:8001") - sync.OnUpsertKey("local", "status", string(netmap.NodeStatusActive)) assert.Equal(t, localNode, m.LocalNode()) }) @@ -210,7 +178,7 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { t.Run("update node", func(t *testing.T) { localNode := &netmap.Node{ ID: "local", - Status: netmap.NodeStatusJoining, + Status: netmap.NodeStatusActive, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } @@ -225,12 +193,10 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") - sync.OnUpsertKey("remote", "status", string(netmap.NodeStatusActive)) _, ok := m.Node("remote") assert.True(t, ok) - sync.OnUpsertKey("remote", "status", string(netmap.NodeStatusLeaving)) sync.OnUpsertKey("remote", "endpoint:my-endpoint-2", "8") sync.OnDeleteKey("remote", "endpoint:my-endpoint") @@ -238,7 +204,7 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { assert.True(t, ok) assert.Equal(t, node, &netmap.Node{ ID: "remote", - Status: netmap.NodeStatusLeaving, + Status: netmap.NodeStatusActive, ProxyAddr: "10.26.104.98:8000", AdminAddr: "10.26.104.98:8001", Endpoints: map[string]int{ @@ -246,11 +212,12 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { }, }) }) +} - t.Run("remove node", func(t *testing.T) { +func TestSyncer_RemoteNodeLeave(t *testing.T) { + t.Run("active node leave", func(t *testing.T) { localNode := &netmap.Node{ ID: "local", - Status: netmap.NodeStatusJoining, ProxyAddr: "10.26.104.56:8000", AdminAddr: "10.26.104.56:8001", } @@ -261,15 +228,238 @@ func TestSyncer_UpdateRemoteNode(t *testing.T) { gossiper := &fakeGossiper{} sync.Sync(gossiper) + // Add remote node. sync.OnJoin("remote") sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") - sync.OnUpsertKey("remote", "status", string(netmap.NodeStatusActive)) + // Leaving should update the netmap. sync.OnLeave("remote") + node, ok := m.Node("remote") + assert.True(t, ok) + assert.Equal(t, node, &netmap.Node{ + ID: "remote", + Status: netmap.NodeStatusLeft, + ProxyAddr: "10.26.104.98:8000", + AdminAddr: "10.26.104.98:8001", + Endpoints: map[string]int{ + "my-endpoint": 5, + }, + }) + + sync.OnExpired("remote") + + // Expiring should remove from the netmap. + _, ok = m.Node("remote") + assert.False(t, ok) + }) + + t.Run("pending node leave", func(t *testing.T) { + localNode := &netmap.Node{ + ID: "local", + ProxyAddr: "10.26.104.56:8000", + AdminAddr: "10.26.104.56:8001", + } + m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + + sync := newSyncer(m, log.NewNopLogger()) + + gossiper := &fakeGossiper{} + sync.Sync(gossiper) + + // Add remote node. + sync.OnJoin("remote") + sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") + + // Leaving should discard the pending node. + sync.OnLeave("remote") + + sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") + _, ok := m.Node("remote") assert.False(t, ok) }) + + t.Run("local node leave", func(t *testing.T) { + localNode := &netmap.Node{ + ID: "local", + Status: netmap.NodeStatusActive, + ProxyAddr: "10.26.104.56:8000", + AdminAddr: "10.26.104.56:8001", + } + m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + + sync := newSyncer(m, log.NewNopLogger()) + + gossiper := &fakeGossiper{} + sync.Sync(gossiper) + + // Attempting to mark the local node as left should have no affect. + sync.OnLeave("local") + + assert.Equal(t, localNode, m.LocalNode()) + }) +} + +func TestSyncer_RemoteNodeDown(t *testing.T) { + t.Run("active node", func(t *testing.T) { + localNode := &netmap.Node{ + ID: "local", + ProxyAddr: "10.26.104.56:8000", + AdminAddr: "10.26.104.56:8001", + } + m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + + sync := newSyncer(m, log.NewNopLogger()) + + gossiper := &fakeGossiper{} + sync.Sync(gossiper) + + // Add remote node. + sync.OnJoin("remote") + sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") + sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") + sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") + + // Marking a node down should update the netmap. + sync.OnDown("remote") + + node, ok := m.Node("remote") + assert.True(t, ok) + assert.Equal(t, node, &netmap.Node{ + ID: "remote", + Status: netmap.NodeStatusDown, + ProxyAddr: "10.26.104.98:8000", + AdminAddr: "10.26.104.98:8001", + Endpoints: map[string]int{ + "my-endpoint": 5, + }, + }) + + sync.OnExpired("remote") + + // Expiring should remove from the netmap. + _, ok = m.Node("remote") + assert.False(t, ok) + }) + + t.Run("active node recovers", func(t *testing.T) { + localNode := &netmap.Node{ + ID: "local", + ProxyAddr: "10.26.104.56:8000", + AdminAddr: "10.26.104.56:8001", + } + m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + + sync := newSyncer(m, log.NewNopLogger()) + + gossiper := &fakeGossiper{} + sync.Sync(gossiper) + + // Add remote node. + sync.OnJoin("remote") + sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") + sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") + sync.OnUpsertKey("remote", "endpoint:my-endpoint", "5") + + // Marking a node down should update the netmap. + sync.OnDown("remote") + + // Marking a node healthy should update the netmap. + sync.OnHealthy("remote") + + node, ok := m.Node("remote") + assert.True(t, ok) + assert.Equal(t, node, &netmap.Node{ + ID: "remote", + Status: netmap.NodeStatusActive, + ProxyAddr: "10.26.104.98:8000", + AdminAddr: "10.26.104.98:8001", + Endpoints: map[string]int{ + "my-endpoint": 5, + }, + }) + }) + + t.Run("pending node down", func(t *testing.T) { + localNode := &netmap.Node{ + ID: "local", + ProxyAddr: "10.26.104.56:8000", + AdminAddr: "10.26.104.56:8001", + } + m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + + sync := newSyncer(m, log.NewNopLogger()) + + gossiper := &fakeGossiper{} + sync.Sync(gossiper) + + // Add remote node. + sync.OnJoin("remote") + sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") + + // Marking down should not remove the pending node. + sync.OnDown("remote") + sync.OnHealthy("remote") + + sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") + + node, ok := m.Node("remote") + assert.True(t, ok) + assert.Equal(t, node, &netmap.Node{ + ID: "remote", + Status: netmap.NodeStatusActive, + ProxyAddr: "10.26.104.98:8000", + AdminAddr: "10.26.104.98:8001", + }) + }) + + t.Run("pending node expires", func(t *testing.T) { + localNode := &netmap.Node{ + ID: "local", + ProxyAddr: "10.26.104.56:8000", + AdminAddr: "10.26.104.56:8001", + } + m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + + sync := newSyncer(m, log.NewNopLogger()) + + gossiper := &fakeGossiper{} + sync.Sync(gossiper) + + // Add remote node. + sync.OnJoin("remote") + sync.OnUpsertKey("remote", "proxy_addr", "10.26.104.98:8000") + + // Marking down should not remove the pending node. + sync.OnDown("remote") + sync.OnExpired("remote") + + sync.OnUpsertKey("remote", "admin_addr", "10.26.104.98:8001") + + _, ok := m.Node("remote") + assert.False(t, ok) + }) + + t.Run("local node leave", func(t *testing.T) { + localNode := &netmap.Node{ + ID: "local", + Status: netmap.NodeStatusActive, + ProxyAddr: "10.26.104.56:8000", + AdminAddr: "10.26.104.56:8001", + } + m := netmap.NewNetworkMap(localNode.Copy(), log.NewNopLogger()) + + sync := newSyncer(m, log.NewNopLogger()) + + gossiper := &fakeGossiper{} + sync.Sync(gossiper) + + // Attempting to mark the local node as down should have no affect. + sync.OnLeave("local") + + assert.Equal(t, localNode, m.LocalNode()) + }) } diff --git a/server/netmapv2/netmap.go b/server/netmapv2/netmap.go index 6011318..d0511dc 100644 --- a/server/netmapv2/netmap.go +++ b/server/netmapv2/netmap.go @@ -16,7 +16,6 @@ type NetworkMap struct { localID string nodes map[string]*Node - localStatusSubscribers []func(NodeStatus) localEndpointSubscribers []func(endpointID string, listeners int) // mu protects the above fields. @@ -31,6 +30,8 @@ func NewNetworkMap( localNode *Node, logger log.Logger, ) *NetworkMap { + // The local node is always active. + localNode.Status = NodeStatusActive nodes := make(map[string]*Node) nodes[localNode.ID] = localNode @@ -87,36 +88,6 @@ func (m *NetworkMap) Nodes() []*Node { return nodes } -// UpdateLocalStatus sets the status of the local node. -func (m *NetworkMap) UpdateLocalStatus(status NodeStatus) { - m.mu.Lock() - defer m.mu.Unlock() - - node, ok := m.nodes[m.localID] - if !ok { - panic("local node not in netmap") - } - - oldStatus := node.Status - node.Status = status - m.updateMetricsEntry(oldStatus, status) - - for _, f := range m.localStatusSubscribers { - f(node.Status) - } -} - -// OnLocalStatusUpdate subscribes to changes to the local node status. -// -// The callback is called with the netmap mutex locked so must not block or -// call back to the netmap. -func (m *NetworkMap) OnLocalStatusUpdate(f func(NodeStatus)) { - m.mu.Lock() - defer m.mu.Unlock() - - m.localStatusSubscribers = append(m.localStatusSubscribers, f) -} - // AddLocalEndpoint adds the active endpoint to the local node state. func (m *NetworkMap) AddLocalEndpoint(endpointID string) { m.mu.Lock() diff --git a/server/netmapv2/netmap_test.go b/server/netmapv2/netmap_test.go index 75d1891..9ac185a 100644 --- a/server/netmapv2/netmap_test.go +++ b/server/netmapv2/netmap_test.go @@ -26,25 +26,6 @@ func TestNetworkMap_LocalNode(t *testing.T) { assert.Equal(t, []*Node{localNode}, m.Nodes()) } -func TestNetworkMap_UpdateLocalStatus(t *testing.T) { - localNode := &Node{ - ID: "local", - Status: NodeStatusActive, - } - m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) - - var notifiedStatus NodeStatus - m.OnLocalStatusUpdate(func(status NodeStatus) { - notifiedStatus = status - }) - m.UpdateLocalStatus(NodeStatusLeaving) - - assert.Equal(t, NodeStatusLeaving, notifiedStatus) - - n, _ := m.Node("local") - assert.Equal(t, NodeStatusLeaving, n.Status) -} - func TestNetworkMap_UpdateLocalEndpoint(t *testing.T) { localNode := &Node{ ID: "local", @@ -101,7 +82,7 @@ func TestNetworkMap_AddNode(t *testing.T) { newNode := &Node{ ID: "remote", - Status: NodeStatusLeaving, + Status: NodeStatusActive, } m.AddNode(newNode) n, ok := m.Node("remote") @@ -125,14 +106,14 @@ func TestNetworkMap_AddNode(t *testing.T) { newNode := &Node{ ID: "remote", - Status: NodeStatusLeaving, + Status: NodeStatusActive, } m.AddNode(newNode) // Attempting to add a node with the same ID should succeed. newNode = &Node{ ID: "remote", - Status: NodeStatusJoining, + Status: NodeStatusDown, } m.AddNode(newNode) @@ -152,7 +133,7 @@ func TestNetworkMap_AddNode(t *testing.T) { // should not be updated. newNode := &Node{ ID: "local", - Status: NodeStatusLeaving, + Status: NodeStatusActive, } m.AddNode(newNode) assert.Equal(t, localNode, m.LocalNode()) @@ -169,7 +150,7 @@ func TestNetworkMap_RemoveNode(t *testing.T) { newNode := &Node{ ID: "remote", - Status: NodeStatusLeaving, + Status: NodeStatusDown, } m.AddNode(newNode) assert.True(t, m.RemoveNode(newNode.ID)) @@ -202,13 +183,13 @@ func TestNetworkMap_UpdateRemoteStatus(t *testing.T) { newNode := &Node{ ID: "remote", - Status: NodeStatusJoining, + Status: NodeStatusActive, } m.AddNode(newNode) - assert.True(t, m.UpdateRemoteStatus("remote", NodeStatusActive)) + assert.True(t, m.UpdateRemoteStatus("remote", NodeStatusDown)) n, _ := m.Node("remote") - assert.Equal(t, NodeStatusActive, n.Status) + assert.Equal(t, NodeStatusDown, n.Status) }) t.Run("update local status", func(t *testing.T) { @@ -219,7 +200,7 @@ func TestNetworkMap_UpdateRemoteStatus(t *testing.T) { m := NewNetworkMap(localNode.Copy(), log.NewNopLogger()) // Attempting to update the local node should have no affect. - assert.False(t, m.UpdateRemoteStatus("local", NodeStatusLeaving)) + assert.False(t, m.UpdateRemoteStatus("local", NodeStatusDown)) assert.Equal(t, localNode, m.LocalNode()) }) } @@ -234,7 +215,7 @@ func TestNetworkMap_UpdateRemoteEndpoint(t *testing.T) { newNode := &Node{ ID: "remote", - Status: NodeStatusJoining, + Status: NodeStatusDown, } m.AddNode(newNode) assert.True(t, m.UpdateRemoteEndpoint("remote", "my-endpoint", 7)) @@ -266,7 +247,7 @@ func TestNetworkMap_RemoveRemoteEndpoint(t *testing.T) { newNode := &Node{ ID: "remote", - Status: NodeStatusJoining, + Status: NodeStatusActive, } m.AddNode(newNode) assert.True(t, m.UpdateRemoteEndpoint("remote", "my-endpoint", 7)) diff --git a/server/netmapv2/node.go b/server/netmapv2/node.go index fbcbb34..e759e32 100644 --- a/server/netmapv2/node.go +++ b/server/netmapv2/node.go @@ -1,17 +1,13 @@ package netmap -// NodeStatus contains the known status as set by the node itself. +// NodeStatus contains the known status of a node. type NodeStatus string const ( - // NodeStatusJoining means the node is joining the cluster though is not - // yet accepting traffic. - NodeStatusJoining NodeStatus = "joining" // NodeStatusActive means the node is healthy and accepting traffic. NodeStatusActive NodeStatus = "active" - // NodeStatusLeaving means the node is leaving the cluster and no longer - // accepting traffic. - NodeStatusLeaving NodeStatus = "leaving" + // NodeStatusDown means the node is considered down. + NodeStatusDown NodeStatus = "down" // NodeStatusLeft means the node has left the cluster. NodeStatusLeft NodeStatus = "left" ) @@ -26,7 +22,7 @@ type Node struct { // The ID is immutable. ID string `json:"id"` - // Status contains the node status as set by the node itself. + // Status contains the known status of the node. Status NodeStatus `json:"status"` // ProxyAddr is the advertised proxy address.