Skip to content

Commit

Permalink
gossip: add member expire support
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Apr 27, 2024
1 parent 8c7f2ef commit 2f55f45
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 150 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ lint:
.PHONY: generate
generate:
protoc --go_out=. --go_opt=paths=source_relative api/rpc.proto

.PHONY: coverage
coverage:
go test ./... -coverprofile=coverage.out
go tool cover -html=coverage.out
3 changes: 0 additions & 3 deletions cli/server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ func runv2(conf *config.Config, logger log.Logger) error {

networkMap := netmapv2.NewNetworkMap(&netmapv2.Node{
ID: conf.Cluster.NodeID,
Status: netmapv2.NodeStatusActive,
ProxyAddr: conf.Proxy.AdvertiseAddr,
AdminAddr: conf.Admin.AdvertiseAddr,
}, logger)
Expand Down Expand Up @@ -488,8 +487,6 @@ func runv2(conf *config.Config, logger log.Logger) error {
zap.String("signal", sig.String()),
)

networkMap.UpdateLocalStatus(netmapv2.NodeStatusLeft)

// Leave as soon as we receive the shutdown signal to avoid receiving
// forward proxy requests.
if err := gossip.Leave(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/andydunstall/pico
go 1.21.1

require (
github.com/andydunstall/kite v0.0.0-20240426052643-5e213b67e3ac
github.com/andydunstall/kite v0.0.0-20240427082845-91285793db0f
github.com/gin-gonic/gin v1.9.1
github.com/goccy/go-yaml v1.11.3
github.com/gorilla/websocket v1.5.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/andydunstall/kite v0.0.0-20240426052643-5e213b67e3ac h1:N8JXYaQmKcOUF943KzmOIwKCw+uu+WL7yvNbA+wjZZc=
github.com/andydunstall/kite v0.0.0-20240426052643-5e213b67e3ac/go.mod h1:7Bn7WGcSRLvX5HQySLzcGl9dJD1m9xoYNZtx5IIkx1I=
github.com/andydunstall/kite v0.0.0-20240427082845-91285793db0f h1:VFrRSjIOVtFHY9fe7Sqji747Bn5Lbe3r1u5ydPnYAtI=
github.com/andydunstall/kite v0.0.0-20240427082845-91285793db0f/go.mod h1:7Bn7WGcSRLvX5HQySLzcGl9dJD1m9xoYNZtx5IIkx1I=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bytedance/sonic v1.11.5 h1:G00FYjjqll5iQ1PYXynbg/hyzqBqavH8Mo9/oTopd9k=
Expand Down
4 changes: 4 additions & 0 deletions server/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func (w *kiteWatcher) OnHealthy(_ string) {
// TODO(andydunstall)
}

func (w *kiteWatcher) OnExpired(_ string) {
// TODO(andydunstall)
}

func (w *kiteWatcher) OnDown(memberID string) {
w.gossip.onRemoteDown(memberID)
}
Expand Down
161 changes: 128 additions & 33 deletions server/gossipv2/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type gossiper interface {
//
// When a node joins, it is considered 'pending' so not added to the netmap
// until we have the full node state. Since gossip propagates state updates in
// order, we always send the initial node state status last, meaning if we
// have the status of a remote node, we'll have its other immutable fields.
// order, we only add a node to the netmap when we have the required immutable
// fields.
type syncer struct {
// pendingNodes contains nodes that we haven't received the full state for
// yet so can't be added to the netmap.
Expand All @@ -48,16 +48,12 @@ func newSyncer(networkMap *netmap.NetworkMap, logger log.Logger) *syncer {
func (s *syncer) Sync(gossiper gossiper) {
s.gossiper = gossiper

s.networkMap.OnLocalStatusUpdate(s.onLocalStatusUpdate)
s.networkMap.OnLocalEndpointUpdate(s.onLocalEndpointUpdate)

localNode := s.networkMap.LocalNode()
// First add immutable fields.
s.gossiper.UpsertLocal("proxy_addr", localNode.ProxyAddr)
s.gossiper.UpsertLocal("admin_addr", localNode.AdminAddr)
// Next add status, which means receiving nodes will consider the node
// state 'complete' so add to the netmap.
s.gossiper.UpsertLocal("status", string(localNode.Status))
// Finally add mutable fields.
for endpointID, listeners := range localNode.Endpoints {
key := "endpoint:" + endpointID
Expand Down Expand Up @@ -103,9 +99,17 @@ func (s *syncer) OnJoin(nodeID string) {
}

func (s *syncer) OnLeave(nodeID string) {
if deleted := s.networkMap.RemoveNode(nodeID); deleted {
if nodeID == s.networkMap.LocalID() {
s.logger.Warn(
"node healthy; same id as local node",
zap.String("node-id", nodeID),
)
return
}

if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusLeft); updated {
s.logger.Info(
"node leave; removed from netmap",
"node leave; updated netmap",
zap.String("node-id", nodeID),
)
return
Expand All @@ -114,6 +118,7 @@ func (s *syncer) OnLeave(nodeID string) {
s.mu.Lock()
defer s.mu.Unlock()

// If a pending node has left it can be discarded.
_, ok := s.pendingNodes[nodeID]
if ok {
delete(s.pendingNodes, nodeID)
Expand All @@ -130,17 +135,114 @@ func (s *syncer) OnLeave(nodeID string) {
}
}

func (s *syncer) OnHealthy(_ string) {
// TODO(andydunstall):
// If a node goes down then comes back, need to ensure we still have the
// state of that node. Therefore if a node is down, mark it as down in the
// netmap but don't remove. Wait for Kite to send OnLeave before actually
// removing from the netmap (since we know Kite will do an OnJoin if it
// comes back). Therefore need to update Kite.
func (s *syncer) OnHealthy(nodeID string) {
if nodeID == s.networkMap.LocalID() {
s.logger.Warn(
"node healthy; same id as local node",
zap.String("node-id", nodeID),
)
return
}

if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusActive); updated {
s.logger.Info(
"node helathy; updated netmap",
zap.String("node-id", nodeID),
)
return
}

s.mu.Lock()
defer s.mu.Unlock()

pending, ok := s.pendingNodes[nodeID]
if ok {
pending.Status = netmap.NodeStatusActive

s.logger.Info(
"node healthy; updated pending",
zap.String("node-id", nodeID),
)
} else {
s.logger.Warn(
"node healthy; unknown node",
zap.String("node-id", nodeID),
)
}
}

func (s *syncer) OnDown(nodeID string) {
if nodeID == s.networkMap.LocalID() {
s.logger.Warn(
"node down; same id as local node",
zap.String("node-id", nodeID),
)
return
}

if updated := s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatusDown); updated {
s.logger.Info(
"node down; updated netmap",
zap.String("node-id", nodeID),
)
return
}

s.mu.Lock()
defer s.mu.Unlock()

// Update pending status. We must still retain the pending node as it may
// come back.
pending, ok := s.pendingNodes[nodeID]
if ok {
pending.Status = netmap.NodeStatusDown

s.logger.Info(
"node down; updated pending",
zap.String("node-id", nodeID),
)
} else {
s.logger.Warn(
"node down; unknown node",
zap.String("node-id", nodeID),
)
}
}

func (s *syncer) OnDown(_ string) {
// TODO(andydunstall): See above.
func (s *syncer) OnExpired(nodeID string) {
if nodeID == s.networkMap.LocalID() {
s.logger.Warn(
"node expired; same id as local node",
zap.String("node-id", nodeID),
)
return
}

if removed := s.networkMap.RemoveNode(nodeID); removed {
s.logger.Info(
"node expired; removed from netmap",
zap.String("node-id", nodeID),
)
return
}

s.mu.Lock()
defer s.mu.Unlock()

_, ok := s.pendingNodes[nodeID]
if ok {
delete(s.pendingNodes, nodeID)

s.logger.Info(
"node expired; removed from pending",
zap.String("node-id", nodeID),
)
} else {
s.logger.Warn(
"node expired; unknown node",
zap.String("node-id", nodeID),
)
}
}

func (s *syncer) OnUpsertKey(nodeID, key, value string) {
Expand All @@ -155,11 +257,7 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) {

// First check if the node is already in the netmap. Only check mutable
// fields.
if key == "status" {
if s.networkMap.UpdateRemoteStatus(nodeID, netmap.NodeStatus(value)) {
return
}
} else if strings.HasPrefix(key, "endpoint:") {
if strings.HasPrefix(key, "endpoint:") {
endpointID, _ := strings.CutPrefix(key, "endpoint:")
listeners, err := strconv.Atoi(value)
if err != nil {
Expand Down Expand Up @@ -188,10 +286,6 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) {
zap.String("value", value),
)
return
}

if key == "status" {
node.Status = netmap.NodeStatus(value)
} else if key == "proxy_addr" {
node.ProxyAddr = value
} else if key == "admin_addr" {
Expand Down Expand Up @@ -221,9 +315,14 @@ func (s *syncer) OnUpsertKey(nodeID, key, value string) {
return
}

// Once we have the node status for the pending node, it can be added to
// the netmap.
if node.Status != "" {
// Once we have the nodes immutable fields it can be added to the netmap.
if node.ProxyAddr != "" && node.AdminAddr != "" {
if node.Status == "" {
// Unless we've received a down/leave notification, we consider
// the node as active.
node.Status = netmap.NodeStatusActive
}

delete(s.pendingNodes, node.ID)
s.networkMap.AddNode(node)

Expand Down Expand Up @@ -297,10 +396,6 @@ func (s *syncer) OnDeleteKey(nodeID, key string) {
)
}

func (s *syncer) onLocalStatusUpdate(status netmap.NodeStatus) {
s.gossiper.UpsertLocal("status", string(status))
}

func (s *syncer) onLocalEndpointUpdate(endpointID string, listeners int) {
key := "endpoint:" + endpointID
if listeners > 0 {
Expand Down
Loading

0 comments on commit 2f55f45

Please sign in to comment.