Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #13 from BlockscapeLab/leave_event_handling
Browse files Browse the repository at this point in the history
Leave event handling
  • Loading branch information
BlockscapeLab authored Aug 27, 2020
2 parents c046bcc + e4b6d68 commit 576c37c
Show file tree
Hide file tree
Showing 20 changed files with 386 additions and 80 deletions.
16 changes: 10 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

---

\-
### General Changes

* Added custom handling for intended leave events in order to distinguish them better from crash-related ones

## v0.2.0

Expand All @@ -16,19 +18,21 @@

### Bugfixes

* Fixed a bug that prevented nodes with `expect = 1` from becoming the cluster leader if there were other peers listed in the peerlist of the raftify.json file
* Fixed a bug that prevented the prevote quorum from being adjusted to the new cluster size
* Fixed a bug that caused a node to get stuck after a rejoin during operation
* Merge pull request #5: Fixed a bug that prevented nodes with `expect = 1` from becoming the cluster leader if there were other peers listed in the peerlist of the raftify.json file
* Merge pull request #9: Fixed a bug that prevented the prevote quorum from being adjusted to the new cluster size
* Merge pull request #10: Fixed a bug that caused a node to get stuck after a rejoin during operation

### General Changes

* Bump to memberlist `v0.2.2`
* Added version info on startup
* Merge pull request #11: Added version info on startup

### Testing

* Added more unit tests for more stable code coverage
* Merge pull request #7: Added more unit tests for more stable code coverage

## v0.1.0

---

* First release
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ unit-tests:
# Integration Tests
integration-tests:
@echo "Running integration tests for Raftify..."
@go test -v -parallel=1 helpers_test.go api.go bootstrap.go candidate.go config.go follower.go handlers.go leader.go lists.go messages.go node.go precandidate.go rejoin.go shutdown.go state.go types.go util.go node_integration_test.go
@go test -v -parallel=1 helpers_test.go api.go bootstrap.go candidate.go config.go follower.go handlers.go leader.go lists.go messages.go node.go precandidate.go preshutdown.go rejoin.go shutdown.go state.go types.go util.go version.go node_integration_test.go
@echo "Tests finished"
4 changes: 2 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ func InitNode(logger *log.Logger, workingDir string) (*Node, error) {
// Shutdown stops all timers/tickers and listeners, closes channels, leaves the
// memberlist and shuts down the node.
func (n *Node) Shutdown() error {
n.shutdownCh <- nil // Initiates switch to Shutdown state
err := <-n.shutdownCh // Waits for response from node in Shutdown state
n.shutdownCh <- nil // Initiates switch to PreShutdown state
err := <-n.shutdownCh // Waits for response from node in PreShutdown state
return err
}

Expand Down
2 changes: 1 addition & 1 deletion bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ func (n *Node) runBootstrap() {
}

case <-n.shutdownCh:
n.toShutdown()
n.toPreShutdown()
}
}
30 changes: 0 additions & 30 deletions bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,6 @@ import (
"time"
)

func TestTryJoin(t *testing.T) {
// Reserve ports for this test
ports := reservePorts(2)

// Initialize dummy nodes
node1 := initDummyNode("TestNode_1", 1, 2, ports[0])
node2 := initDummyNode("TestNode_2", 1, 2, ports[1])

node1.config.PeerList = []string{fmt.Sprintf("127.0.0.1:%v", node2.config.BindPort)}
node2.config.PeerList = []string{fmt.Sprintf("127.0.0.1:%v", node1.config.BindPort)}

// Start node1 and fail while trying to join node2
node1.createMemberlist()
defer node1.memberlist.Shutdown()

if err := node1.tryJoin(); err == nil {
t.Logf("Expected node1 to throw an error on tryJoin, instead error was nil")
t.FailNow()
}

// Start node2 and succeed while trying to join node1
node2.createMemberlist()
defer node2.memberlist.Shutdown()

if err := node2.tryJoin(); err != nil {
t.Logf("Expected node2 to successfully join node1, instead got error: %v", err.Error())
t.FailNow()
}
}

func TestToBootstrap(t *testing.T) {
// Reserve ports for this test
ports := reservePorts(2)
Expand Down
10 changes: 9 additions & 1 deletion candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ func (n *Node) runCandidate() {
}
n.handleVoteResponse(content)

case NewQuorumMsg:
var content NewQuorum
if err := json.Unmarshal(msg.Content, &content); err != nil {
n.logger.Printf("[ERR] raftify: error while unmarshaling new quorum message: %v\n", err.Error())
break
}
n.handleNewQuorum(content)

default:
n.logger.Printf("[WARN] raftify: received %v as candidate, discarding...\n", msg.Type.toString())
}
Expand All @@ -90,6 +98,6 @@ func (n *Node) runCandidate() {
n.saveState()

case <-n.shutdownCh:
n.toShutdown()
n.toPreShutdown()
}
}
10 changes: 9 additions & 1 deletion follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ func (n *Node) runFollower() {
}
n.handleVoteRequest(content)

case NewQuorumMsg:
var content NewQuorum
if err := json.Unmarshal(msg.Content, &content); err != nil {
n.logger.Printf("[ERR] raftify: error while unmarshaling new quorum message: %v\n", err.Error())
break
}
n.handleNewQuorum(content)

default:
n.logger.Printf("[WARN] raftify: received %v as follower, discarding...\n", msg.Type.toString())
}
Expand All @@ -64,6 +72,6 @@ func (n *Node) runFollower() {
n.saveState()

case <-n.shutdownCh:
n.toShutdown()
n.toPreShutdown()
}
}
37 changes: 37 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package raftify

import (
"fmt"

"github.com/hashicorp/memberlist"
)

// handleHeartbeat handles the receival of a heartbeat message from a leader.
Expand Down Expand Up @@ -190,3 +192,38 @@ func (n *Node) handleVoteResponse(msg VoteResponse) {
n.logger.Printf("[DEBUG] raftify: Received vote response from %v (not granted)\n", msg.FollowerID)
}
}

// handleNewQuorum handles the receival of a new quorum message from a node in the PreShutdown state.
func (n *Node) handleNewQuorum(msg NewQuorum) {
n.logger.Printf("[DEBUG] raftify: Received new quorum, waiting for %v to leave...\n", msg.LeavingID)

// If the event is not the leave event fired by the node that announced its exit, do nothing
if event := <-n.events.eventCh; event.Node.Name != msg.LeavingID || event.Event != memberlist.NodeLeave {
switch event.Event {
case memberlist.NodeJoin:
n.logger.Printf("[ERR] raftify: Unsuspected join event from %v\n", event.Node.Name)
case memberlist.NodeUpdate:
n.logger.Printf("[ERR] raftify: Unsuspected update event from %v\n", event.Node.Name)
case memberlist.NodeLeave:
n.logger.Printf("[ERR] raftify: Unsuspected leave event from %v, expected leave from %v\n", event.Node.Name, msg.LeavingID)
}
return
}

n.logger.Printf("[DEBUG] raftify: Setting the quorum from %v to %v\n", n.quorum, msg.NewQuorum)
n.quorum = msg.NewQuorum
n.saveState()

if msg.NewQuorum == 1 {
n.logger.Printf("[DEBUG] raftify: %v is the only node left in the cluster, entering leader state for term %v...", n.config.ID, n.currentTerm)

// Switch to the Leader state without calling toLeader in order to bypass the state change
// restriction in this corner case.
n.timeoutTimer.Stop() // Leaders have no timeout
n.startMessageTicker() // Used to periodically send out heartbeat messages
n.heartbeatIDList.reset()

n.votedFor = ""
n.state = Leader
}
}
87 changes: 87 additions & 0 deletions handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package raftify
import (
"testing"
"time"

"github.com/hashicorp/memberlist"
)

func TestHandleHeartbeatAsFollower(t *testing.T) {
Expand Down Expand Up @@ -434,3 +436,88 @@ func TestHandleVoteResponse(t *testing.T) {
t.FailNow()
}
}

func TestHandleNewQuorum(t *testing.T) {
// Reserve ports for this test
ports := reservePorts(1)

// Initialize and start dummy node
node := initDummyNode("TestNode", 1, 1, ports[0])
node.createMemberlist()
defer node.memberlist.Shutdown()

nq := NewQuorum{
NewQuorum: 2,
LeavingID: "TestNode",
}

defer node.deleteState()

// Valid test case if new quorum greater than 1 is handled and leave event is fired
node.events.eventCh <- memberlist.NodeEvent{
Event: memberlist.NodeLeave,
Node: &memberlist.Node{
Name: "TestNode",
},
}
node.handleNewQuorum(nq)

if node.quorum != 2 {
t.Logf("Expected the quorum to be 2, instead got %v", node.quorum)
t.FailNow()
}
if node.state == Leader {
t.Logf("Expected node to be in any other state but the Leader state, instead got %v", node.state.toString())
t.FailNow()
}

// Valid test case if new quorum is 1 and leave event is fired
nq.NewQuorum = 1

node.events.eventCh <- memberlist.NodeEvent{
Event: memberlist.NodeLeave,
Node: &memberlist.Node{
Name: "TestNode",
},
}
node.handleNewQuorum(nq)

if node.quorum != 1 {
t.Logf("Expected the quorum to be 1, instead got %v", node.quorum)
t.FailNow()
}
if node.state != Leader {
t.Logf("Expected node to be in the Leader state, instead got %v", node.state.toString())
t.FailNow()
}

// Invalid test case if join event is fired
nq.NewQuorum = 0

node.events.eventCh <- memberlist.NodeEvent{
Event: memberlist.NodeJoin,
Node: &memberlist.Node{
Name: "TestNode",
},
}
node.handleNewQuorum(nq)

if node.quorum != 1 {
t.Logf("Expected quorum to have stayed 1, instead got %v", node.quorum)
t.FailNow()
}

// Invalid test case if leave event is fired by wrong node
node.events.eventCh <- memberlist.NodeEvent{
Event: memberlist.NodeLeave,
Node: &memberlist.Node{
Name: "WrongTestNode",
},
}
node.handleNewQuorum(nq)

if node.quorum != 1 {
t.Logf("Expected quorum to have stayed 1, instead got %v", node.quorum)
t.FailNow()
}
}
10 changes: 9 additions & 1 deletion leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (n *Node) runLeader() {
}
n.handleVoteRequest(content)

case NewQuorumMsg:
var content NewQuorum
if err := json.Unmarshal(msg.Content, &content); err != nil {
n.logger.Printf("[ERR] raftify: error while unmarshaling new quorum message: %v\n", err.Error())
break
}
n.handleNewQuorum(content)

default:
n.logger.Printf("[WARN] raftify: received %v as leader, discarding...\n", msg.Type.toString())
}
Expand Down Expand Up @@ -107,6 +115,6 @@ func (n *Node) runLeader() {
n.saveState()

case <-n.shutdownCh:
n.toShutdown()
n.toPreShutdown()
}
}
38 changes: 32 additions & 6 deletions messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package raftify

import (
"encoding/json"
"fmt"

"github.com/hashicorp/memberlist"
)
Expand Down Expand Up @@ -57,6 +56,13 @@ type VoteResponse struct {
VoteGranted bool `json:"vote_granted"`
}

// NewQuorum defines the message sent out by a node that is voluntarily leaving the cluster,
// triggering an immediate quorum change. This does not include crash-related leave events.
type NewQuorum struct {
NewQuorum int `json:"new_quorum"`
LeavingID string `json:"leaving_id"`
}

// sendHeartbeatToAll sends a heartbeat message to all the other cluster members.
func (n *Node) sendHeartbeatToAll() {
n.heartbeatIDList.reset()
Expand Down Expand Up @@ -220,12 +226,32 @@ func (n *Node) sendVoteResponse(candidateid string, grant bool) {
}
}

// getNodeByName returns the full Node struct from memberlist to the specified name.
func (n *Node) getNodeByName(name string) (*memberlist.Node, error) {
// sendNewQuorumToAll sends the new quorum to the rest of the cluster triggered by a voluntary
// leave event. Once memberlist has processed the leave event internally, this message is used
// to trigger an immediate change of the new quorum instead of waiting for the dead node to
// be kicked. This function returns the number of nodes that the new quorum could be sent to.
func (n *Node) sendNewQuorumToAll(newquorum int) int {
nqBytes, _ := json.Marshal(NewQuorum{
NewQuorum: newquorum,
LeavingID: n.config.ID,
})
msgBytes, _ := json.Marshal(Message{
Type: NewQuorumMsg,
Content: nqBytes,
})

// Count how many members received the new quorum message
membersReached := 0

for _, member := range n.memberlist.Members() {
if name == member.Name {
return member, nil
if member.Name == n.config.ID {
continue
}
if err := n.memberlist.SendReliable(member, msgBytes); err != nil {
n.logger.Printf("[ERR] raftify: couldn't send new quorum to %v: %v\n", member.Name, err.Error())
continue
}
membersReached++
}
return nil, fmt.Errorf("couldn't find %v in the local memberlist", name)
return membersReached
}
Loading

0 comments on commit 576c37c

Please sign in to comment.