From ff8742c58ce52ab4a6a60b6444f6cf6c88552426 Mon Sep 17 00:00:00 2001 From: Hao Fan Date: Sun, 17 Sep 2023 17:01:02 +0800 Subject: [PATCH 1/4] feat: add havePartitions param to localDisk Signed-off-by: Hao Fan --- pkg/devicemanager/partition/partition.go | 10 +++++++++- pkg/devicemanager/types/disk.go | 2 ++ runners/devicecheck.go | 13 +++++------- runners/devicecheck_test.go | 25 ++++++++++++------------ 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/pkg/devicemanager/partition/partition.go b/pkg/devicemanager/partition/partition.go index 4a3fb180..08345fff 100644 --- a/pkg/devicemanager/partition/partition.go +++ b/pkg/devicemanager/partition/partition.go @@ -137,6 +137,7 @@ func parseUdevInfo(output string) map[string]string { } return result } + func (ld *LocalPartitionImplement) CreatePartition(name, groups string, size uint64) error { partition, _ := ld.GetPartition(name, groups) if partition.Name == name { @@ -416,6 +417,7 @@ func parseDiskString(diskString string) []*types.LocalDisk { diskString = strings.ReplaceAll(diskString, "\"", "") //diskString = strings.ReplaceAll(diskString, " ", "") + parentDisk := map[string]int8{} blksList := strings.Split(diskString, "\n") for _, blks := range blksList { @@ -447,6 +449,7 @@ func parseDiskString(diskString string) []*types.LocalDisk { tmp.Filesystem = k[1] case "PKNAME": tmp.ParentName = k[1] + parentDisk[tmp.ParentName] = 1 case "MAJ:MIN": tmp.DeviceNumber = k[1] default: @@ -456,8 +459,13 @@ func parseDiskString(diskString string) []*types.LocalDisk { resp = append(resp, &tmp) } - return resp + for _, res := range resp { + if _, ok := parentDisk[res.Name]; ok { + res.HavePartitions = true + } + } + return resp } func filter(disklist []*types.LocalDisk) (diskList []*types.LocalDisk) { diff --git a/pkg/devicemanager/types/disk.go b/pkg/devicemanager/types/disk.go index 76c2f93a..32a7bca3 100644 --- a/pkg/devicemanager/types/disk.go +++ b/pkg/devicemanager/types/disk.go @@ -58,4 +58,6 @@ type LocalDisk struct { ParentName string `json:"parentName"` // Device number DeviceNumber string `json:"deviceNumber"` + // Have partitions + HavePartitions bool `json:"havePartitions"` } diff --git a/runners/devicecheck.go b/runners/devicecheck.go index 3c6fd066..51d1dbcf 100644 --- a/runners/devicecheck.go +++ b/runners/devicecheck.go @@ -18,14 +18,15 @@ package runners import ( "context" - deviceManager "github.com/carina-io/carina/pkg/devicemanager" - "k8s.io/apimachinery/pkg/api/equality" "regexp" - "sigs.k8s.io/controller-runtime/pkg/manager" "strings" "time" + "k8s.io/apimachinery/pkg/api/equality" + "sigs.k8s.io/controller-runtime/pkg/manager" + "github.com/carina-io/carina/pkg/configuration" + deviceManager "github.com/carina-io/carina/pkg/devicemanager" "github.com/carina-io/carina/pkg/devicemanager/types" "github.com/carina-io/carina/utils" "github.com/carina-io/carina/utils/log" @@ -218,10 +219,6 @@ func (dc *deviceCheck) discoverDisk(diskClass map[string]configuration.DiskSelec return blockClass, nil } - parentDisk := map[string]int8{} - for _, d := range localDisk { - parentDisk[d.ParentName] = 1 - } // If the disk has been added to a VG group, add it to this vg group hasMatchedDisk := map[string]int8{} @@ -238,7 +235,7 @@ func (dc *deviceCheck) discoverDisk(diskClass map[string]configuration.DiskSelec // 过滤出空块设备 for _, d := range localDisk { // 如果是其他磁盘Parent直接跳过 - if _, ok := parentDisk[d.Name]; ok { + if d.HavePartitions { continue } diff --git a/runners/devicecheck_test.go b/runners/devicecheck_test.go index 3706a52b..9efc2f70 100644 --- a/runners/devicecheck_test.go +++ b/runners/devicecheck_test.go @@ -1,30 +1,30 @@ /* - Copyright @ 2021 bocloud . +Copyright @ 2021 bocloud . - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package runners import ( "bytes" "fmt" - deviceManager "github.com/carina-io/carina/pkg/devicemanager" "os" "os/exec" "strings" "testing" "github.com/carina-io/carina/pkg/configuration" + deviceManager "github.com/carina-io/carina/pkg/devicemanager" ) const deviceDir = "/tmp/disk/" @@ -70,7 +70,6 @@ func makeLoopbackDevice(name string) (string, error) { } func cleanLoopback(loops []string, files []string) error { - for _, loop := range loops { err := exec.Command("losetup", "-d", loop).Run() if err != nil { @@ -91,7 +90,7 @@ func TestDeviceManager(t *testing.T) { initDevice() defer cleanLoopback(loops, names) - dm := deviceManager.NewDeviceManager("localhost", nil) + dm := deviceManager.NewDeviceManager("localhost", nil, nil) dc := &deviceCheck{dm: dm} defer func() { // 清理volumex From 0c1bd1376e47ccb0dacc5af4cb733e45f6fc26f1 Mon Sep 17 00:00:00 2001 From: Hao Fan Date: Sun, 17 Sep 2023 18:14:56 +0800 Subject: [PATCH 2/4] fix: resolve the problem of continuous list lv on some nodes Signed-off-by: Hao Fan --- .../driver/k8s/logicvolume_service.go | 26 +++++++++++-------- pkg/csidriver/driver/k8s/node_service.go | 22 +++++++++------- pkg/metrics/volume_stats_collector.go | 6 +++-- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/pkg/csidriver/driver/k8s/logicvolume_service.go b/pkg/csidriver/driver/k8s/logicvolume_service.go index c17a0fae..8b045fbe 100644 --- a/pkg/csidriver/driver/k8s/logicvolume_service.go +++ b/pkg/csidriver/driver/k8s/logicvolume_service.go @@ -20,21 +20,21 @@ import ( "context" "errors" "fmt" - "github.com/carina-io/carina" - "github.com/carina-io/carina/getter" "sync" "time" - carinav1 "github.com/carina-io/carina/api/v1" - "github.com/carina-io/carina/utils/log" - "sigs.k8s.io/controller-runtime/pkg/manager" - "google.golang.org/grpc/codes" "google.golang.org/grpc/status" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/carina-io/carina" + carinav1 "github.com/carina-io/carina/api/v1" + "github.com/carina-io/carina/getter" + "github.com/carina-io/carina/utils/log" ) // ErrVolumeNotFound represents the specified volume is not found. @@ -74,7 +74,7 @@ func (v *logicVolumeGetter) GetByVolumeId(ctx context.Context, volumeID string) } // not found. try direct reader. - err = v.api.List(ctx, lvList) + err = v.api.List(ctx, lvList, &client.ListOptions{Raw: &metav1.ListOptions{ResourceVersion: "0"}}) if err != nil { return nil, err } @@ -96,7 +96,7 @@ func (v *logicVolumeGetter) GetByVolumeId(ctx context.Context, volumeID string) return foundLv, nil } -func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string) ([]*carinav1.LogicVolume, error) { +func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string, tryReader bool) ([]*carinav1.LogicVolume, error) { lvList := new(carinav1.LogicVolumeList) var lvs []*carinav1.LogicVolume err := v.cache.List(ctx, lvList, client.MatchingFields{indexFiledNodeName: nodeName}) @@ -112,8 +112,12 @@ func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string) return lvs, nil } + if !tryReader { + return lvs, nil + } + // not found. try direct reader. - err = v.api.List(ctx, lvList) + err = v.api.List(ctx, lvList, &client.ListOptions{Raw: &metav1.ListOptions{ResourceVersion: "0"}}) if err != nil { return nil, err } @@ -331,8 +335,8 @@ func (s *LogicVolumeService) GetLogicVolumeByVolumeId(ctx context.Context, volum } // GetLogicVolumesByNodeName returns logicVolumes by node name. -func (s *LogicVolumeService) GetLogicVolumesByNodeName(ctx context.Context, nodeName string) ([]*carinav1.LogicVolume, error) { - return s.lvGetter.GetByNodeName(ctx, nodeName) +func (s *LogicVolumeService) GetLogicVolumesByNodeName(ctx context.Context, nodeName string, tryReader bool) ([]*carinav1.LogicVolume, error) { + return s.lvGetter.GetByNodeName(ctx, nodeName, tryReader) } // UpdateLogicVolumeCurrentSize UpdateCurrentSize updates .Status.CurrentSize of LogicVolume. diff --git a/pkg/csidriver/driver/k8s/node_service.go b/pkg/csidriver/driver/k8s/node_service.go index 67552089..655ed2a3 100644 --- a/pkg/csidriver/driver/k8s/node_service.go +++ b/pkg/csidriver/driver/k8s/node_service.go @@ -20,13 +20,9 @@ import ( "context" "errors" "fmt" - "github.com/carina-io/carina" - carinav1beta1 "github.com/carina-io/carina/api/v1beta1" - "github.com/carina-io/carina/getter" - "github.com/carina-io/carina/pkg/configuration" - "github.com/carina-io/carina/pkg/csidriver/driver/util" - "github.com/carina-io/carina/utils" - "github.com/carina-io/carina/utils/log" + "sort" + "strings" + "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -34,8 +30,14 @@ import ( "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" - "sort" - "strings" + + "github.com/carina-io/carina" + carinav1beta1 "github.com/carina-io/carina/api/v1beta1" + "github.com/carina-io/carina/getter" + "github.com/carina-io/carina/pkg/configuration" + "github.com/carina-io/carina/pkg/csidriver/driver/util" + "github.com/carina-io/carina/utils" + "github.com/carina-io/carina/utils/log" ) // This annotation is present on K8s 1.11 release. @@ -71,7 +73,7 @@ func NewNodeService(mgr manager.Manager, lvService *LogicVolumeService) *NodeSer func (n NodeService) getLvExclusivityDisks(ctx context.Context, nodeName string) ([]string, error) { lvExclusivityDisks := []string{} - lvs, err := n.lvService.GetLogicVolumesByNodeName(ctx, nodeName) + lvs, err := n.lvService.GetLogicVolumesByNodeName(ctx, nodeName, true) if err != nil { return nil, status.Errorf(codes.Internal, "can not get logic volumes for nodeName %s", nodeName) } diff --git a/pkg/metrics/volume_stats_collector.go b/pkg/metrics/volume_stats_collector.go index b6ca4da4..9e3969d7 100644 --- a/pkg/metrics/volume_stats_collector.go +++ b/pkg/metrics/volume_stats_collector.go @@ -19,9 +19,11 @@ package metrics import ( "context" "errors" - "github.com/carina-io/carina/pkg/csidriver/driver/k8s" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/procfs/blockdevice" + + "github.com/carina-io/carina/pkg/csidriver/driver/k8s" ) const ( @@ -138,7 +140,7 @@ func (v *volumeStatsCollector) Update(ch chan<- prometheus.Metric) error { if err != nil { return errors.New("couldn't get diskstats:" + err.Error()) } - logicVolumes, err := v.lvService.GetLogicVolumesByNodeName(context.Background(), nodeName) + logicVolumes, err := v.lvService.GetLogicVolumesByNodeName(context.Background(), nodeName, false) if err != nil { return err } From 960725d183134a7dc60f7e96c00c83da05abb734 Mon Sep 17 00:00:00 2001 From: Hao Fan Date: Sun, 17 Sep 2023 21:34:33 +0800 Subject: [PATCH 3/4] fix: resolve the problem of excessive time consumption during pvc expansion Signed-off-by: Hao Fan --- controllers/logicvolume_controller.go | 17 +++++++++++------ pkg/csidriver/driver/k8s/logicvolume_service.go | 6 ------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/controllers/logicvolume_controller.go b/controllers/logicvolume_controller.go index adf83dc2..95c63249 100644 --- a/controllers/logicvolume_controller.go +++ b/controllers/logicvolume_controller.go @@ -19,14 +19,9 @@ package controllers import ( "context" "fmt" - "github.com/carina-io/carina" - deviceManager "github.com/carina-io/carina/pkg/devicemanager" - "sigs.k8s.io/controller-runtime/pkg/controller" "strconv" "time" - "github.com/carina-io/carina/utils" - "github.com/carina-io/carina/utils/log" "google.golang.org/grpc/codes" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" @@ -34,9 +29,14 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" + "github.com/carina-io/carina" carinav1 "github.com/carina-io/carina/api/v1" + deviceManager "github.com/carina-io/carina/pkg/devicemanager" + "github.com/carina-io/carina/utils" + "github.com/carina-io/carina/utils/log" ) // LogicVolumeReconciler reconciles a LogicVolume object @@ -106,6 +106,8 @@ func (r *LogicVolumeReconciler) SetupWithManager(mgr ctrl.Manager) error { // operation lvm func (r *LogicVolumeReconciler) removeLVIfExists(ctx context.Context, lv *carinav1.LogicVolume) error { + log.Info("Start to remove LV name ", lv.Name) + // Finalizer's process ( RemoveLV then removeString ) is not atomic, // so checking existence of LV to ensure its idempotence var err error @@ -145,9 +147,10 @@ func (r *LogicVolumeReconciler) removeLVIfExists(ctx context.Context, lv *carina } func (r *LogicVolumeReconciler) createLV(ctx context.Context, lv *carinav1.LogicVolume) error { + log.Info("Start to create LV name ", lv.Name) + // When lv.Status.Code is not codes.OK (== 0), CreateLV has already failed. // LogicalVolume CRD will be deleted soon by the controller. - if lv.Status.Code != codes.OK { return nil } @@ -238,6 +241,8 @@ func (r *LogicVolumeReconciler) createLV(ctx context.Context, lv *carinav1.Logic } func (r *LogicVolumeReconciler) expandLV(ctx context.Context, lv *carinav1.LogicVolume) error { + log.Info("Start to expand LV name ", lv.Name) + // The reconciliation loop of LogicVolume may call expandLV before resizing is triggered. // So, lv.Status.CurrentSize could be nil here. if lv.Status.CurrentSize == nil { diff --git a/pkg/csidriver/driver/k8s/logicvolume_service.go b/pkg/csidriver/driver/k8s/logicvolume_service.go index 8b045fbe..6c209fa1 100644 --- a/pkg/csidriver/driver/k8s/logicvolume_service.go +++ b/pkg/csidriver/driver/k8s/logicvolume_service.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "sync" "time" "google.golang.org/grpc/codes" @@ -45,7 +44,6 @@ type LogicVolumeService struct { client.Client getter *getter.RetryGetter lvGetter *logicVolumeGetter - mu sync.Mutex } const ( @@ -161,8 +159,6 @@ func NewLogicVolumeService(mgr manager.Manager) (*LogicVolumeService, error) { // CreateVolume creates volume func (s *LogicVolumeService) CreateVolume(ctx context.Context, namespace, pvc, node, deviceGroup, pvName string, requestGb int64, owner metav1.OwnerReference, annotation map[string]string) (string, uint32, uint32, error) { log.Info("k8s.CreateVolume called name ", pvName, " node ", node, " deviceGroup ", deviceGroup, " size_gb ", requestGb) - s.mu.Lock() - defer s.mu.Unlock() lv := &carinav1.LogicVolume{ TypeMeta: metav1.TypeMeta{ @@ -284,8 +280,6 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string) // ExpandVolume expands volume func (s *LogicVolumeService) ExpandVolume(ctx context.Context, volumeID string, requestGb int64) error { log.Info("k8s.ExpandVolume called volumeID ", volumeID, " requestGb ", requestGb) - s.mu.Lock() - defer s.mu.Unlock() lv, err := s.GetLogicVolumeByVolumeId(ctx, volumeID) if err != nil { From 2bbefcc5e791455b30c8fb7f1e80aae25ae3ca80 Mon Sep 17 00:00:00 2001 From: Hao Fan Date: Sun, 17 Sep 2023 21:40:04 +0800 Subject: [PATCH 4/4] feat: stop carina controller from trying to expand/delete volume on a node removed from the cluster Signed-off-by: Hao Fan --- .../driver/k8s/logicvolume_service.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/csidriver/driver/k8s/logicvolume_service.go b/pkg/csidriver/driver/k8s/logicvolume_service.go index 6c209fa1..6ad73708 100644 --- a/pkg/csidriver/driver/k8s/logicvolume_service.go +++ b/pkg/csidriver/driver/k8s/logicvolume_service.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -249,6 +250,10 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string) return err } + if !lv.GetDeletionTimestamp().IsZero() { + return errors.New("lv is being deleted") + } + err = s.Delete(ctx, lv) if err != nil { if apierrors.IsNotFound(err) { @@ -257,6 +262,13 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string) return err } + // if the node doesn't exist, return directly + existingNode := new(corev1.Node) + err = s.getter.Get(ctx, client.ObjectKey{Name: lv.Spec.NodeName}, existingNode) + if err != nil { + return err + } + // wait until delete the target volume for { log.Info("waiting for delete LogicalVolume name ", lv.Name) @@ -286,6 +298,13 @@ func (s *LogicVolumeService) ExpandVolume(ctx context.Context, volumeID string, return err } + // if the node doesn't exist, return directly + existingNode := new(corev1.Node) + err = s.getter.Get(ctx, client.ObjectKey{Name: lv.Spec.NodeName}, existingNode) + if err != nil { + return err + } + err = s.UpdateLogicVolumeSpecSize(ctx, volumeID, resource.NewQuantity(requestGb<<30, resource.BinarySI)) if err != nil { return err