Skip to content

Commit

Permalink
fix: rewrite k8s filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorres committed Jul 16, 2024
1 parent 1a87424 commit 1bab3d1
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 45 deletions.
3 changes: 1 addition & 2 deletions pkg/client/auth/credentials/iam_creds.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"strings"
"sync"

"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
yc "github.com/ydb-platform/ydb-go-yc"
"google.golang.org/grpc/metadata"

"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
)

type iamCredsProvider struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/rolling/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"github.com/ydb-platform/ydbops/internal/collections"
"github.com/ydb-platform/ydbops/pkg/options"
"github.com/ydb-platform/ydbops/pkg/profile"
"github.com/ydb-platform/ydbops/pkg/rolling/restarters"
"github.com/ydb-platform/ydbops/pkg/utils"
)

const (
DefaultRetryCount = 3
DefaultRestartDurationSeconds = 60
DefaultCMSQueryIntervalSeconds = 10
DefaultMaxStaticNodeId = 50000
)

type RestartOptions struct {
Expand Down Expand Up @@ -196,7 +196,7 @@ after that would be considered a regular cluster failure`)
for this invocation must be the same as for the previous invocation, and this can not be verified at runtime since
the ydbops utility is stateless. Use at your own risk.`)

fs.IntVar(&o.MaxStaticNodeId, "max-static-node-id", DefaultMaxStaticNodeId,
fs.IntVar(&o.MaxStaticNodeId, "max-static-node-id", restarters.DefaultMaxStaticNodeId,
`This argument is used to help ydbops distinguish storage and dynamic nodes.
Nodes with this nodeId or less will be considered storage.`)

Expand Down
10 changes: 8 additions & 2 deletions pkg/rolling/restarters/primitives.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/ydb-platform/ydbops/pkg/options"
)

const (
DefaultMaxStaticNodeId = 50000
)

func FilterStorageNodes(nodes []*Ydb_Maintenance.Node, maxStaticNodeId uint32) []*Ydb_Maintenance.Node {
return collections.FilterBy(nodes,
func(node *Ydb_Maintenance.Node) bool {
Expand Down Expand Up @@ -138,9 +142,11 @@ func SatisfiedVersion(node *Ydb_Maintenance.Node, version *options.VersionSpec)

major, minor, patch, err := parseNodeVersion(node.Version)
if err != nil {
zap.S().Errorf(`ALARM: failed to parse %s when user specified a non-nil version. The filtering will
errorMsg := fmt.Sprintf(`ALARM: failed to parse '%s' when user specified a non-nil version. The filtering will
be conservative and not include the node, but it might be not what you want. Either you have a weird node
version in your cluster or we need to teach 'ydbops' to support one more version format.`)
version in your cluster or we need to teach 'ydbops' to support one more version format.`, node.Version)

zap.S().Errorf(errorMsg)
return false
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/rolling/restarters/restarters_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package restarters

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestRestart(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Restart Suite")
}
23 changes: 16 additions & 7 deletions pkg/rolling/restarters/storage_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,29 @@ func populateWithK8sRules(
return selectedNodes
}

func applyStorageK8sFilteringRules(
spec FilterNodeParams,
allNodes []*Ydb_Maintenance.Node,
fqdnToPodName map[string]string,
) []*Ydb_Maintenance.Node {
allStorageNodes := FilterStorageNodes(allNodes, spec.MaxStaticNodeId)

selectedNodes := populateWithK8sRules(allStorageNodes, spec, fqdnToPodName)

filteredNodes := ExcludeByCommonFields(selectedNodes, spec)

return filteredNodes
}

func (r *StorageK8sRestarter) Filter(
spec FilterNodeParams,
cluster ClusterNodesInfo,
) []*Ydb_Maintenance.Node {
storageLabelSelector := "app.kubernetes.io/component=storage-node"

r.prepareK8sState(r.Opts.kubeconfigPath, storageLabelSelector, r.Opts.namespace)

allStorageNodes := FilterStorageNodes(cluster.AllNodes, spec.MaxStaticNodeId)

selectedNodes := populateWithK8sRules(allStorageNodes, spec, r.FQDNToPodName)

filteredNodes := ExcludeByCommonFields(selectedNodes, spec)
filteredNodes := applyStorageK8sFilteringRules(spec, cluster.AllNodes, r.FQDNToPodName)

r.logger.Debugf("Storage K8s restarter selected following nodes for restart: %v", filteredNodes)
return selectedNodes
return filteredNodes
}
64 changes: 64 additions & 0 deletions pkg/rolling/restarters/storage_k8s_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package restarters

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/ydb-platform/ydbops/pkg/options"
"github.com/ydb-platform/ydbops/tests/mock"
)

var _ = Describe("Test storage k8s Filter", func() {
var (
now = time.Now()
tenMinutesAgoTimestamp = now.Add(-10 * time.Minute)
fiveMinutesAgoTimestamp = now.Add(-5 * time.Minute)
)

It("k8s restarter filtering by --started>timestamp", func() {
filterSpec := FilterNodeParams{
MaxStaticNodeId: DefaultMaxStaticNodeId,
StartedTime: &options.StartedTime{
Direction: '<',
Timestamp: fiveMinutesAgoTimestamp,
},
}

nodeGroups := [][]uint32{
{1, 2, 3, 4, 5, 6, 7, 8},
}
nodeInfoMap := map[uint32]mock.TestNodeInfo{
1: {
StartTime: tenMinutesAgoTimestamp,
},
2: {
StartTime: tenMinutesAgoTimestamp,
},
3: {
StartTime: tenMinutesAgoTimestamp,
},
}

nodes := mock.CreateNodesFromShortConfig(nodeGroups, nodeInfoMap)

clusterInfo := ClusterNodesInfo{
AllNodes: nodes,
TenantToNodeIds: map[string][]uint32{},
}

filteredNodes := applyStorageK8sFilteringRules(filterSpec, clusterInfo.AllNodes, map[string]string{})

Expect(len(filteredNodes)).To(Equal(3))

filteredNodeIds := make(map[uint32]bool)
for _, node := range filteredNodes {
filteredNodeIds[node.NodeId] = true
}

Expect(filteredNodeIds).Should(HaveKey(uint32(1)))
Expect(filteredNodeIds).Should(HaveKey(uint32(2)))
Expect(filteredNodeIds).Should(HaveKey(uint32(3)))
})
})
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tests
package restarters

import (
"time"
Expand All @@ -8,20 +8,18 @@ import (
"go.uber.org/zap"

"github.com/ydb-platform/ydbops/pkg/options"
"github.com/ydb-platform/ydbops/pkg/rolling"
"github.com/ydb-platform/ydbops/pkg/rolling/restarters"
"github.com/ydb-platform/ydbops/tests/mock"
)

var _ = Describe("Test storage Filter", func() {
var _ = Describe("Test storage ssh Filter", func() {
var (
now = time.Now()
tenMinutesAgoTimestamp = now.Add(-10 * time.Minute)
fiveMinutesAgoTimestamp = now.Add(-5 * time.Minute)
)

It("ssh restarter filtering by --started>timestamp", func() {
restarter := restarters.NewStorageSSHRestarter(zap.S(), []string{}, "")
restarter := NewStorageSSHRestarter(zap.S(), []string{}, "")

nodeGroups := [][]uint32{
{1, 2, 3, 4, 5, 6, 7, 8},
Expand All @@ -40,15 +38,15 @@ var _ = Describe("Test storage Filter", func() {

nodes := mock.CreateNodesFromShortConfig(nodeGroups, nodeInfoMap)

filterSpec := restarters.FilterNodeParams{
MaxStaticNodeId: rolling.DefaultMaxStaticNodeId,
filterSpec := FilterNodeParams{
MaxStaticNodeId: DefaultMaxStaticNodeId,
StartedTime: &options.StartedTime{
Direction: '<',
Timestamp: fiveMinutesAgoTimestamp,
},
}

clusterInfo := restarters.ClusterNodesInfo{
clusterInfo := ClusterNodesInfo{
AllNodes: nodes,
TenantToNodeIds: map[string][]uint32{},
}
Expand All @@ -67,8 +65,8 @@ var _ = Describe("Test storage Filter", func() {
Expect(filteredNodeIds).Should(HaveKey(uint32(3)))
})

It("ssh restarter without arguments takes all storage nodes", func() {
restarter := restarters.NewStorageSSHRestarter(zap.S(), []string{}, "")
It("storage restarter without arguments takes all storage nodes and no dynnodes", func() {
restarter := NewStorageSSHRestarter(zap.S(), []string{}, "")

nodeGroups := [][]uint32{
{1, 2, 3, 4, 5, 6, 7, 8},
Expand All @@ -91,11 +89,11 @@ var _ = Describe("Test storage Filter", func() {

nodes := mock.CreateNodesFromShortConfig(nodeGroups, nodeInfoMap)

filterSpec := restarters.FilterNodeParams{
MaxStaticNodeId: rolling.DefaultMaxStaticNodeId,
filterSpec := FilterNodeParams{
MaxStaticNodeId: DefaultMaxStaticNodeId,
}

clusterInfo := restarters.ClusterNodesInfo{
clusterInfo := ClusterNodesInfo{
AllNodes: nodes,
TenantToNodeIds: map[string][]uint32{
"fakeTenant": {9, 10, 11},
Expand Down
24 changes: 17 additions & 7 deletions pkg/rolling/restarters/tenant_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,30 @@ func (r TenantK8sRestarter) RestartNode(node *Ydb_Maintenance.Node) error {
return r.restartNodeByRestartingPod(node.Host, r.Opts.namespace)
}

func (r *TenantK8sRestarter) Filter(spec FilterNodeParams, cluster ClusterNodesInfo) []*Ydb_Maintenance.Node {
databaseLabelSelector := "app.kubernetes.io/component=dynamic-node"

r.prepareK8sState(r.Opts.kubeconfigPath, databaseLabelSelector, r.Opts.namespace)

func applyTenantK8sFilteringRules(
spec FilterNodeParams,
cluster ClusterNodesInfo,
fqdnToPodName map[string]string,
) []*Ydb_Maintenance.Node {
tenantNodes := FilterTenantNodes(cluster.AllNodes)

selectedNodes := populateWithK8sRules(tenantNodes, spec, r.FQDNToPodName)
selectedNodes := populateWithK8sRules(tenantNodes, spec, fqdnToPodName)

selectedNodes = ExcludeByTenantNames(selectedNodes, spec.SelectedTenants, cluster.TenantToNodeIds)

filteredNodes := ExcludeByCommonFields(selectedNodes, spec)

return filteredNodes
}

func (r *TenantK8sRestarter) Filter(spec FilterNodeParams, cluster ClusterNodesInfo) []*Ydb_Maintenance.Node {
databaseLabelSelector := "app.kubernetes.io/component=dynamic-node"

r.prepareK8sState(r.Opts.kubeconfigPath, databaseLabelSelector, r.Opts.namespace)

filteredNodes := applyTenantK8sFilteringRules(spec, cluster, r.FQDNToPodName)

r.logger.Debugf("Tenant K8s restarter selected following nodes for restart: %v", filteredNodes)

return selectedNodes
return filteredNodes
}
94 changes: 94 additions & 0 deletions pkg/rolling/restarters/tenant_k8s_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package restarters

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/ydb-platform/ydbops/pkg/options"
"github.com/ydb-platform/ydbops/tests/mock"
)

var _ = Describe("Test tenant k8s Filter", func() {
var (
now = time.Now()
tenMinutesAgoTimestamp = now.Add(-10 * time.Minute)
fiveMinutesAgoTimestamp = now.Add(-5 * time.Minute)
)

It("k8s restarter filtering by --started>timestamp", func() {
filterSpec := FilterNodeParams{
MaxStaticNodeId: DefaultMaxStaticNodeId,
StartedTime: &options.StartedTime{
Direction: '<',
Timestamp: fiveMinutesAgoTimestamp,
},
}

nodeGroups := [][]uint32{
{1, 2, 3, 4, 5, 6, 7, 8},
{9, 10, 11},
{12, 13, 14},
}

tenant1Name := "tenant1"
tenant2Name := "tenant2"

nodeInfoMap := map[uint32]mock.TestNodeInfo{
1: {
StartTime: tenMinutesAgoTimestamp,
},
9: {
StartTime: tenMinutesAgoTimestamp,
IsDynnode: true,
TenantName: tenant1Name,
},
10: {
IsDynnode: true,
TenantName: tenant1Name,
},
11: {
IsDynnode: true,
TenantName: tenant1Name,
},
12: {
StartTime: tenMinutesAgoTimestamp,
IsDynnode: true,
TenantName: tenant2Name,
},
13: {
IsDynnode: true,
TenantName: tenant2Name,
},
14: {
StartTime: tenMinutesAgoTimestamp,
IsDynnode: true,
TenantName: tenant2Name,
},
}

nodes := mock.CreateNodesFromShortConfig(nodeGroups, nodeInfoMap)

clusterInfo := ClusterNodesInfo{
AllNodes: nodes,
TenantToNodeIds: map[string][]uint32{
tenant1Name: {9, 10, 11},
tenant2Name: {12, 13, 14},
},
}

filteredNodes := applyTenantK8sFilteringRules(filterSpec, clusterInfo, map[string]string{})

Expect(len(filteredNodes)).To(Equal(3))

filteredNodeIds := make(map[uint32]bool)
for _, node := range filteredNodes {
filteredNodeIds[node.NodeId] = true
}

Expect(filteredNodeIds).Should(HaveKey(uint32(9)))
Expect(filteredNodeIds).Should(HaveKey(uint32(12)))
Expect(filteredNodeIds).Should(HaveKey(uint32(14)))
})
})
Loading

0 comments on commit 1bab3d1

Please sign in to comment.