From 1bab3d1c49d1a044fb7324648712781d5c57bc9a Mon Sep 17 00:00:00 2001 From: Jorres Tarasov Date: Tue, 16 Jul 2024 16:22:49 +0200 Subject: [PATCH] fix: rewrite k8s filtering --- pkg/client/auth/credentials/iam_creds.go | 3 +- pkg/rolling/options.go | 4 +- pkg/rolling/restarters/primitives.go | 10 +- .../restarters/restarters_suite_test.go | 13 +++ pkg/rolling/restarters/storage_k8s.go | 23 +++-- pkg/rolling/restarters/storage_k8s_test.go | 64 +++++++++++++ .../rolling/restarters/storage_ssh_test.go | 24 +++-- pkg/rolling/restarters/tenant_k8s.go | 24 +++-- pkg/rolling/restarters/tenant_k8s_test.go | 94 +++++++++++++++++++ pkg/rolling/rolling.go | 5 + tests/e2e_suite_test.go | 2 +- tests/mock/cms-nodes.go | 19 ++-- 12 files changed, 240 insertions(+), 45 deletions(-) create mode 100644 pkg/rolling/restarters/restarters_suite_test.go create mode 100644 pkg/rolling/restarters/storage_k8s_test.go rename tests/filtering_test.go => pkg/rolling/restarters/storage_ssh_test.go (76%) create mode 100644 pkg/rolling/restarters/tenant_k8s_test.go diff --git a/pkg/client/auth/credentials/iam_creds.go b/pkg/client/auth/credentials/iam_creds.go index 5df0579..3cf0ae3 100644 --- a/pkg/client/auth/credentials/iam_creds.go +++ b/pkg/client/auth/credentials/iam_creds.go @@ -5,10 +5,9 @@ import ( "strings" "sync" + "github.com/ydb-platform/ydb-go-sdk/v3/credentials" yc "github.com/ydb-platform/ydb-go-yc" "google.golang.org/grpc/metadata" - - "github.com/ydb-platform/ydb-go-sdk/v3/credentials" ) type iamCredsProvider struct { diff --git a/pkg/rolling/options.go b/pkg/rolling/options.go index 1af81fe..1a3ff06 100644 --- a/pkg/rolling/options.go +++ b/pkg/rolling/options.go @@ -16,6 +16,7 @@ import ( "github.com/ydb-platform/ydbops/internal/collections" "github.com/ydb-platform/ydbops/pkg/options" "github.com/ydb-platform/ydbops/pkg/profile" + "github.com/ydb-platform/ydbops/pkg/rolling/restarters" "github.com/ydb-platform/ydbops/pkg/utils" ) @@ -23,7 +24,6 @@ const ( DefaultRetryCount = 3 DefaultRestartDurationSeconds = 60 DefaultCMSQueryIntervalSeconds = 10 - DefaultMaxStaticNodeId = 50000 ) type RestartOptions struct { @@ -196,7 +196,7 @@ after that would be considered a regular cluster failure`) for this invocation must be the same as for the previous invocation, and this can not be verified at runtime since the ydbops utility is stateless. Use at your own risk.`) - fs.IntVar(&o.MaxStaticNodeId, "max-static-node-id", DefaultMaxStaticNodeId, + fs.IntVar(&o.MaxStaticNodeId, "max-static-node-id", restarters.DefaultMaxStaticNodeId, `This argument is used to help ydbops distinguish storage and dynamic nodes. Nodes with this nodeId or less will be considered storage.`) diff --git a/pkg/rolling/restarters/primitives.go b/pkg/rolling/restarters/primitives.go index 49980a0..a2da824 100644 --- a/pkg/rolling/restarters/primitives.go +++ b/pkg/rolling/restarters/primitives.go @@ -14,6 +14,10 @@ import ( "github.com/ydb-platform/ydbops/pkg/options" ) +const ( + DefaultMaxStaticNodeId = 50000 +) + func FilterStorageNodes(nodes []*Ydb_Maintenance.Node, maxStaticNodeId uint32) []*Ydb_Maintenance.Node { return collections.FilterBy(nodes, func(node *Ydb_Maintenance.Node) bool { @@ -138,9 +142,11 @@ func SatisfiedVersion(node *Ydb_Maintenance.Node, version *options.VersionSpec) major, minor, patch, err := parseNodeVersion(node.Version) if err != nil { - zap.S().Errorf(`ALARM: failed to parse %s when user specified a non-nil version. The filtering will + errorMsg := fmt.Sprintf(`ALARM: failed to parse '%s' when user specified a non-nil version. The filtering will be conservative and not include the node, but it might be not what you want. Either you have a weird node - version in your cluster or we need to teach 'ydbops' to support one more version format.`) + version in your cluster or we need to teach 'ydbops' to support one more version format.`, node.Version) + + zap.S().Errorf(errorMsg) return false } diff --git a/pkg/rolling/restarters/restarters_suite_test.go b/pkg/rolling/restarters/restarters_suite_test.go new file mode 100644 index 0000000..64c29e6 --- /dev/null +++ b/pkg/rolling/restarters/restarters_suite_test.go @@ -0,0 +1,13 @@ +package restarters + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestRestart(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Restart Suite") +} diff --git a/pkg/rolling/restarters/storage_k8s.go b/pkg/rolling/restarters/storage_k8s.go index d42aa2a..cef142a 100644 --- a/pkg/rolling/restarters/storage_k8s.go +++ b/pkg/rolling/restarters/storage_k8s.go @@ -67,20 +67,29 @@ func populateWithK8sRules( return selectedNodes } +func applyStorageK8sFilteringRules( + spec FilterNodeParams, + allNodes []*Ydb_Maintenance.Node, + fqdnToPodName map[string]string, +) []*Ydb_Maintenance.Node { + allStorageNodes := FilterStorageNodes(allNodes, spec.MaxStaticNodeId) + + selectedNodes := populateWithK8sRules(allStorageNodes, spec, fqdnToPodName) + + filteredNodes := ExcludeByCommonFields(selectedNodes, spec) + + return filteredNodes +} + func (r *StorageK8sRestarter) Filter( spec FilterNodeParams, cluster ClusterNodesInfo, ) []*Ydb_Maintenance.Node { storageLabelSelector := "app.kubernetes.io/component=storage-node" - r.prepareK8sState(r.Opts.kubeconfigPath, storageLabelSelector, r.Opts.namespace) - allStorageNodes := FilterStorageNodes(cluster.AllNodes, spec.MaxStaticNodeId) - - selectedNodes := populateWithK8sRules(allStorageNodes, spec, r.FQDNToPodName) - - filteredNodes := ExcludeByCommonFields(selectedNodes, spec) + filteredNodes := applyStorageK8sFilteringRules(spec, cluster.AllNodes, r.FQDNToPodName) r.logger.Debugf("Storage K8s restarter selected following nodes for restart: %v", filteredNodes) - return selectedNodes + return filteredNodes } diff --git a/pkg/rolling/restarters/storage_k8s_test.go b/pkg/rolling/restarters/storage_k8s_test.go new file mode 100644 index 0000000..6156843 --- /dev/null +++ b/pkg/rolling/restarters/storage_k8s_test.go @@ -0,0 +1,64 @@ +package restarters + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/ydb-platform/ydbops/pkg/options" + "github.com/ydb-platform/ydbops/tests/mock" +) + +var _ = Describe("Test storage k8s Filter", func() { + var ( + now = time.Now() + tenMinutesAgoTimestamp = now.Add(-10 * time.Minute) + fiveMinutesAgoTimestamp = now.Add(-5 * time.Minute) + ) + + It("k8s restarter filtering by --started>timestamp", func() { + filterSpec := FilterNodeParams{ + MaxStaticNodeId: DefaultMaxStaticNodeId, + StartedTime: &options.StartedTime{ + Direction: '<', + Timestamp: fiveMinutesAgoTimestamp, + }, + } + + nodeGroups := [][]uint32{ + {1, 2, 3, 4, 5, 6, 7, 8}, + } + nodeInfoMap := map[uint32]mock.TestNodeInfo{ + 1: { + StartTime: tenMinutesAgoTimestamp, + }, + 2: { + StartTime: tenMinutesAgoTimestamp, + }, + 3: { + StartTime: tenMinutesAgoTimestamp, + }, + } + + nodes := mock.CreateNodesFromShortConfig(nodeGroups, nodeInfoMap) + + clusterInfo := ClusterNodesInfo{ + AllNodes: nodes, + TenantToNodeIds: map[string][]uint32{}, + } + + filteredNodes := applyStorageK8sFilteringRules(filterSpec, clusterInfo.AllNodes, map[string]string{}) + + Expect(len(filteredNodes)).To(Equal(3)) + + filteredNodeIds := make(map[uint32]bool) + for _, node := range filteredNodes { + filteredNodeIds[node.NodeId] = true + } + + Expect(filteredNodeIds).Should(HaveKey(uint32(1))) + Expect(filteredNodeIds).Should(HaveKey(uint32(2))) + Expect(filteredNodeIds).Should(HaveKey(uint32(3))) + }) +}) diff --git a/tests/filtering_test.go b/pkg/rolling/restarters/storage_ssh_test.go similarity index 76% rename from tests/filtering_test.go rename to pkg/rolling/restarters/storage_ssh_test.go index fe738bd..821f4b8 100644 --- a/tests/filtering_test.go +++ b/pkg/rolling/restarters/storage_ssh_test.go @@ -1,4 +1,4 @@ -package tests +package restarters import ( "time" @@ -8,12 +8,10 @@ import ( "go.uber.org/zap" "github.com/ydb-platform/ydbops/pkg/options" - "github.com/ydb-platform/ydbops/pkg/rolling" - "github.com/ydb-platform/ydbops/pkg/rolling/restarters" "github.com/ydb-platform/ydbops/tests/mock" ) -var _ = Describe("Test storage Filter", func() { +var _ = Describe("Test storage ssh Filter", func() { var ( now = time.Now() tenMinutesAgoTimestamp = now.Add(-10 * time.Minute) @@ -21,7 +19,7 @@ var _ = Describe("Test storage Filter", func() { ) It("ssh restarter filtering by --started>timestamp", func() { - restarter := restarters.NewStorageSSHRestarter(zap.S(), []string{}, "") + restarter := NewStorageSSHRestarter(zap.S(), []string{}, "") nodeGroups := [][]uint32{ {1, 2, 3, 4, 5, 6, 7, 8}, @@ -40,15 +38,15 @@ var _ = Describe("Test storage Filter", func() { nodes := mock.CreateNodesFromShortConfig(nodeGroups, nodeInfoMap) - filterSpec := restarters.FilterNodeParams{ - MaxStaticNodeId: rolling.DefaultMaxStaticNodeId, + filterSpec := FilterNodeParams{ + MaxStaticNodeId: DefaultMaxStaticNodeId, StartedTime: &options.StartedTime{ Direction: '<', Timestamp: fiveMinutesAgoTimestamp, }, } - clusterInfo := restarters.ClusterNodesInfo{ + clusterInfo := ClusterNodesInfo{ AllNodes: nodes, TenantToNodeIds: map[string][]uint32{}, } @@ -67,8 +65,8 @@ var _ = Describe("Test storage Filter", func() { Expect(filteredNodeIds).Should(HaveKey(uint32(3))) }) - It("ssh restarter without arguments takes all storage nodes", func() { - restarter := restarters.NewStorageSSHRestarter(zap.S(), []string{}, "") + It("storage restarter without arguments takes all storage nodes and no dynnodes", func() { + restarter := NewStorageSSHRestarter(zap.S(), []string{}, "") nodeGroups := [][]uint32{ {1, 2, 3, 4, 5, 6, 7, 8}, @@ -91,11 +89,11 @@ var _ = Describe("Test storage Filter", func() { nodes := mock.CreateNodesFromShortConfig(nodeGroups, nodeInfoMap) - filterSpec := restarters.FilterNodeParams{ - MaxStaticNodeId: rolling.DefaultMaxStaticNodeId, + filterSpec := FilterNodeParams{ + MaxStaticNodeId: DefaultMaxStaticNodeId, } - clusterInfo := restarters.ClusterNodesInfo{ + clusterInfo := ClusterNodesInfo{ AllNodes: nodes, TenantToNodeIds: map[string][]uint32{ "fakeTenant": {9, 10, 11}, diff --git a/pkg/rolling/restarters/tenant_k8s.go b/pkg/rolling/restarters/tenant_k8s.go index e5894d4..59f1781 100644 --- a/pkg/rolling/restarters/tenant_k8s.go +++ b/pkg/rolling/restarters/tenant_k8s.go @@ -41,20 +41,30 @@ func (r TenantK8sRestarter) RestartNode(node *Ydb_Maintenance.Node) error { return r.restartNodeByRestartingPod(node.Host, r.Opts.namespace) } -func (r *TenantK8sRestarter) Filter(spec FilterNodeParams, cluster ClusterNodesInfo) []*Ydb_Maintenance.Node { - databaseLabelSelector := "app.kubernetes.io/component=dynamic-node" - - r.prepareK8sState(r.Opts.kubeconfigPath, databaseLabelSelector, r.Opts.namespace) - +func applyTenantK8sFilteringRules( + spec FilterNodeParams, + cluster ClusterNodesInfo, + fqdnToPodName map[string]string, +) []*Ydb_Maintenance.Node { tenantNodes := FilterTenantNodes(cluster.AllNodes) - selectedNodes := populateWithK8sRules(tenantNodes, spec, r.FQDNToPodName) + selectedNodes := populateWithK8sRules(tenantNodes, spec, fqdnToPodName) selectedNodes = ExcludeByTenantNames(selectedNodes, spec.SelectedTenants, cluster.TenantToNodeIds) filteredNodes := ExcludeByCommonFields(selectedNodes, spec) + return filteredNodes +} + +func (r *TenantK8sRestarter) Filter(spec FilterNodeParams, cluster ClusterNodesInfo) []*Ydb_Maintenance.Node { + databaseLabelSelector := "app.kubernetes.io/component=dynamic-node" + + r.prepareK8sState(r.Opts.kubeconfigPath, databaseLabelSelector, r.Opts.namespace) + + filteredNodes := applyTenantK8sFilteringRules(spec, cluster, r.FQDNToPodName) + r.logger.Debugf("Tenant K8s restarter selected following nodes for restart: %v", filteredNodes) - return selectedNodes + return filteredNodes } diff --git a/pkg/rolling/restarters/tenant_k8s_test.go b/pkg/rolling/restarters/tenant_k8s_test.go new file mode 100644 index 0000000..f367b57 --- /dev/null +++ b/pkg/rolling/restarters/tenant_k8s_test.go @@ -0,0 +1,94 @@ +package restarters + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/ydb-platform/ydbops/pkg/options" + "github.com/ydb-platform/ydbops/tests/mock" +) + +var _ = Describe("Test tenant k8s Filter", func() { + var ( + now = time.Now() + tenMinutesAgoTimestamp = now.Add(-10 * time.Minute) + fiveMinutesAgoTimestamp = now.Add(-5 * time.Minute) + ) + + It("k8s restarter filtering by --started>timestamp", func() { + filterSpec := FilterNodeParams{ + MaxStaticNodeId: DefaultMaxStaticNodeId, + StartedTime: &options.StartedTime{ + Direction: '<', + Timestamp: fiveMinutesAgoTimestamp, + }, + } + + nodeGroups := [][]uint32{ + {1, 2, 3, 4, 5, 6, 7, 8}, + {9, 10, 11}, + {12, 13, 14}, + } + + tenant1Name := "tenant1" + tenant2Name := "tenant2" + + nodeInfoMap := map[uint32]mock.TestNodeInfo{ + 1: { + StartTime: tenMinutesAgoTimestamp, + }, + 9: { + StartTime: tenMinutesAgoTimestamp, + IsDynnode: true, + TenantName: tenant1Name, + }, + 10: { + IsDynnode: true, + TenantName: tenant1Name, + }, + 11: { + IsDynnode: true, + TenantName: tenant1Name, + }, + 12: { + StartTime: tenMinutesAgoTimestamp, + IsDynnode: true, + TenantName: tenant2Name, + }, + 13: { + IsDynnode: true, + TenantName: tenant2Name, + }, + 14: { + StartTime: tenMinutesAgoTimestamp, + IsDynnode: true, + TenantName: tenant2Name, + }, + } + + nodes := mock.CreateNodesFromShortConfig(nodeGroups, nodeInfoMap) + + clusterInfo := ClusterNodesInfo{ + AllNodes: nodes, + TenantToNodeIds: map[string][]uint32{ + tenant1Name: {9, 10, 11}, + tenant2Name: {12, 13, 14}, + }, + } + + filteredNodes := applyTenantK8sFilteringRules(filterSpec, clusterInfo, map[string]string{}) + + Expect(len(filteredNodes)).To(Equal(3)) + + filteredNodeIds := make(map[uint32]bool) + for _, node := range filteredNodes { + filteredNodeIds[node.NodeId] = true + } + + Expect(filteredNodeIds).Should(HaveKey(uint32(9))) + Expect(filteredNodeIds).Should(HaveKey(uint32(12))) + Expect(filteredNodeIds).Should(HaveKey(uint32(14))) + }) +}) diff --git a/pkg/rolling/rolling.go b/pkg/rolling/rolling.go index df45f05..4898d85 100644 --- a/pkg/rolling/rolling.go +++ b/pkg/rolling/rolling.go @@ -138,6 +138,8 @@ func (r *Rolling) DoRestart() error { }, ) + fmt.Printf("nodesToRestart %v\n", nodesToRestart) + excludedNodes := 0 for _, node := range nodesToRestart { if _, present := r.state.inactiveNodes[node.NodeId]; present { @@ -162,11 +164,14 @@ func (r *Rolling) DoRestart() error { ScopeType: cms.NodeScope, Nodes: nodesToRestart, } + task, err := r.cms.CreateMaintenanceTask(taskParams) if err != nil { return fmt.Errorf("failed to create maintenance task: %w", err) } + // return nil + return r.cmsWaitingLoop(task, len(nodesToRestart)) } diff --git a/tests/e2e_suite_test.go b/tests/e2e_suite_test.go index b9590db..61c2348 100644 --- a/tests/e2e_suite_test.go +++ b/tests/e2e_suite_test.go @@ -9,5 +9,5 @@ import ( func TestRestart(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Restart Suite") + RunSpecs(t, "Restart e2e suite") } diff --git a/tests/mock/cms-nodes.go b/tests/mock/cms-nodes.go index 11980dd..a253b54 100644 --- a/tests/mock/cms-nodes.go +++ b/tests/mock/cms-nodes.go @@ -101,19 +101,16 @@ func CreateNodesFromShortConfig(nodeGroups [][]uint32, nodeInfo map[uint32]TestN testNodeInfo, moreInfoPresent := nodeInfo[nodeID] node := makeNode(nodeID) - if !moreInfoPresent { - node.Type = &Ydb_Maintenance.Node_Storage{ - Storage: &Ydb_Maintenance.Node_StorageNode{}, - } - - node.State = Ydb_Maintenance.ItemState_ITEM_STATE_UP - - node.StartTime = timestamppb.New(time.Now()) - - node.Version = "some-fake-version-will-fail-when-parsing" + // Put default values: + node.Type = &Ydb_Maintenance.Node_Storage{ + Storage: &Ydb_Maintenance.Node_StorageNode{}, + } + node.State = Ydb_Maintenance.ItemState_ITEM_STATE_UP + node.StartTime = timestamppb.New(time.Now()) + node.Version = "some-fake-version-will-fail-when-parsing" + if !moreInfoPresent { nodes = append(nodes, node) - continue }