Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leader node names to v1beta2.nodepool #2297

Merged
merged 4 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion charts/yurt-manager/crds/apps.openyurt.io_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,18 @@ spec:
leaderEndpoints:
description: LeaderEndpoints is used for storing the address of Leader Yurthub.
items:
type: string
description: Leader represents the hub leader in a nodepool
properties:
endpoint:
description: The address of the leader yurthub
type: string
nodeName:
description: The node name of the leader yurthub
type: string
required:
- endpoint
- nodeName
type: object
type: array
nodes:
description: The list of nodes' names in the pool
Expand Down
11 changes: 10 additions & 1 deletion pkg/apis/apps/v1beta2/nodepool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,23 @@ type NodePoolStatus struct {

// LeaderEndpoints is used for storing the address of Leader Yurthub.
// +optional
LeaderEndpoints []string `json:"leaderEndpoints,omitempty"`
LeaderEndpoints []Leader `json:"leaderEndpoints,omitempty"`

// Conditions represents the latest available observations of a NodePool's
// current state that includes LeaderHubElection status.
// +optional
Conditions []NodePoolCondition `json:"conditions,omitempty"`
}

// Leader represents the hub leader in a nodepool
type Leader struct {
// The node name of the leader yurthub
NodeName string `json:"nodeName"`

// The address of the leader yurthub
Endpoint string `json:"endpoint"`
rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved
}

// NodePoolConditionType represents a NodePool condition value.
type NodePoolConditionType string

Expand Down
17 changes: 16 additions & 1 deletion pkg/apis/apps/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 12 additions & 9 deletions pkg/yurtmanager/controller/hubleader/hubleader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ func (r *ReconcileHubLeader) reconcileHubLeader(ctx context.Context, nodepool *a
// Copy the nodepool to update
updatedNodePool := nodepool.DeepCopy()

// Cache nodes in the list by internalIP -> Node
// Cache nodes in the list by Leader -> Node
// if they are ready and have internal IP
endpointsMap := make(map[string]*corev1.Node)
endpointsMap := make(map[appsv1beta2.Leader]*corev1.Node)
for _, n := range currentNodeList.Items {
internalIP, ok := nodeutil.GetInternalIP(&n)
if !ok {
Expand All @@ -196,12 +196,15 @@ func (r *ReconcileHubLeader) reconcileHubLeader(ctx context.Context, nodepool *a
continue
}

endpointsMap[internalIP] = &n
endpointsMap[appsv1beta2.Leader{
Endpoint: internalIP,
NodeName: n.Name,
}] = &n
}

// Delete leader endpoints that are not in endpoints map
// They are either not ready or not longer the node list and need to be removed
leaderDeleteFn := func(endpoint string) bool {
leaderDeleteFn := func(endpoint appsv1beta2.Leader) bool {
_, ok := endpointsMap[endpoint]
return !ok
}
Expand Down Expand Up @@ -246,12 +249,12 @@ func (r *ReconcileHubLeader) reconcileHubLeader(ctx context.Context, nodepool *a
}

// hasLeadersChanged checks if the leader endpoints have changed
func hasLeadersChanged(old, new []string) bool {
func hasLeadersChanged(old, new []appsv1beta2.Leader) bool {
if len(old) != len(new) {
return true
}

oldSet := make(map[string]struct{}, len(old))
oldSet := make(map[appsv1beta2.Leader]struct{}, len(old))

for i := range old {
oldSet[old[i]] = struct{}{}
Expand All @@ -270,9 +273,9 @@ func hasLeadersChanged(old, new []string) bool {
func electNLeaders(
strategy string,
numLeaders int,
candidates map[string]*corev1.Node,
) ([]string, bool) {
leaderEndpoints := make([]string, 0, len(candidates))
candidates map[appsv1beta2.Leader]*corev1.Node,
) ([]appsv1beta2.Leader, bool) {
leaderEndpoints := make([]appsv1beta2.Leader, 0, len(candidates))
rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved

switch strategy {
case string(appsv1beta2.ElectionStrategyMark), string(appsv1beta2.ElectionStrategyRandom):
Expand Down
106 changes: 95 additions & 11 deletions pkg/yurtmanager/controller/hubleader/hubleader_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package hubleader

import (
"cmp"
"context"
"slices"
"testing"
Expand Down Expand Up @@ -266,7 +267,12 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.1"},
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP",
Endpoint: "10.0.0.1",
},
},
},
},
expectErr: false,
Expand Down Expand Up @@ -306,7 +312,16 @@ func TestReconcile(t *testing.T) {
LeaderReplicas: 2,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.5"},
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Endpoint: "10.0.0.5",
},
},
},
},
expectErr: false,
Expand Down Expand Up @@ -465,7 +480,12 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2"}, // leader already set
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2", // leader already set
},
},
},
},
expectedNodePool: &appsv1beta2.NodePool{
Expand All @@ -485,7 +505,12 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2"}, // should not change leader as replicas met
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2", // should not change leader as replicas met
},
},
},
},
expectErr: false,
Expand All @@ -508,7 +533,16 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.4"}, // .4 was leader (node not ready)
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2",
},
{
NodeName: "not ready with internal IP marked as leader",
Endpoint: "10.0.0.4", // .4 was leader (node not ready)
},
},
},
},
expectedNodePool: &appsv1beta2.NodePool{
Expand All @@ -528,7 +562,16 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.5"}, // new leader is .5
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Endpoint: "10.0.0.5", // new leader is .5
},
},
},
},
expectErr: false,
Expand Down Expand Up @@ -568,7 +611,16 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.5"}, // multiple marked leaders
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Endpoint: "10.0.0.5",
}, // multiple marked leaders
},
},
},
expectErr: false,
Expand Down Expand Up @@ -602,7 +654,20 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.3", "10.0.0.5"}, // multiple marked leaders
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2",
},
{
NodeName: "ready with internal IP and not marked as leader",
Endpoint: "10.0.0.3",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Endpoint: "10.0.0.5",
}, // multiple marked leaders,
},
},
},
expectErr: false,
Expand All @@ -625,7 +690,16 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.5"}, // 2 leaders set, last should be dropped
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Endpoint: "10.0.0.5",
}, // 2 leaders set, last should be dropped
},
},
},
expectedNodePool: &appsv1beta2.NodePool{
Expand All @@ -645,7 +719,12 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2"},
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Endpoint: "10.0.0.2",
},
},
},
},
expectErr: false,
Expand Down Expand Up @@ -682,7 +761,12 @@ func TestReconcile(t *testing.T) {
// Reset resource version - it's not important for the test
actualPool.ResourceVersion = ""
// Sort leader endpoints for comparison - it is not important for the order
slices.Sort(actualPool.Status.LeaderEndpoints)
slices.SortStableFunc(actualPool.Status.LeaderEndpoints, func(a, b appsv1beta2.Leader) int {
return cmp.Compare(
a.Endpoint,
b.Endpoint,
)
})

require.Equal(t, *tc.expectedNodePool, actualPool)
})
Expand Down
Loading