Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize the code logic #198

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions controllers/logicvolume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@ package controllers
import (
"context"
"fmt"
"github.com/carina-io/carina"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
"sigs.k8s.io/controller-runtime/pkg/controller"
"strconv"
"time"

"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
"google.golang.org/grpc/codes"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/carina-io/carina"
carinav1 "github.com/carina-io/carina/api/v1"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
)

// LogicVolumeReconciler reconciles a LogicVolume object
Expand Down Expand Up @@ -106,6 +106,8 @@ func (r *LogicVolumeReconciler) SetupWithManager(mgr ctrl.Manager) error {

// operation lvm
func (r *LogicVolumeReconciler) removeLVIfExists(ctx context.Context, lv *carinav1.LogicVolume) error {
log.Info("Start to remove LV name ", lv.Name)

// Finalizer's process ( RemoveLV then removeString ) is not atomic,
// so checking existence of LV to ensure its idempotence
var err error
Expand Down Expand Up @@ -145,9 +147,10 @@ func (r *LogicVolumeReconciler) removeLVIfExists(ctx context.Context, lv *carina
}

func (r *LogicVolumeReconciler) createLV(ctx context.Context, lv *carinav1.LogicVolume) error {
log.Info("Start to create LV name ", lv.Name)

// When lv.Status.Code is not codes.OK (== 0), CreateLV has already failed.
// LogicalVolume CRD will be deleted soon by the controller.

if lv.Status.Code != codes.OK {
return nil
}
Expand Down Expand Up @@ -238,6 +241,8 @@ func (r *LogicVolumeReconciler) createLV(ctx context.Context, lv *carinav1.Logic
}

func (r *LogicVolumeReconciler) expandLV(ctx context.Context, lv *carinav1.LogicVolume) error {
log.Info("Start to expand LV name ", lv.Name)

// The reconciliation loop of LogicVolume may call expandLV before resizing is triggered.
// So, lv.Status.CurrentSize could be nil here.
if lv.Status.CurrentSize == nil {
Expand Down
51 changes: 34 additions & 17 deletions pkg/csidriver/driver/k8s/logicvolume_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ import (
"context"
"errors"
"fmt"
"github.com/carina-io/carina"
"github.com/carina-io/carina/getter"
"sync"
"time"

carinav1 "github.com/carina-io/carina/api/v1"
"github.com/carina-io/carina/utils/log"
"sigs.k8s.io/controller-runtime/pkg/manager"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/carina-io/carina"
carinav1 "github.com/carina-io/carina/api/v1"
"github.com/carina-io/carina/getter"
"github.com/carina-io/carina/utils/log"
)

// ErrVolumeNotFound represents the specified volume is not found.
Expand All @@ -45,7 +45,6 @@ type LogicVolumeService struct {
client.Client
getter *getter.RetryGetter
lvGetter *logicVolumeGetter
mu sync.Mutex
}

const (
Expand Down Expand Up @@ -74,7 +73,7 @@ func (v *logicVolumeGetter) GetByVolumeId(ctx context.Context, volumeID string)
}

// not found. try direct reader.
err = v.api.List(ctx, lvList)
err = v.api.List(ctx, lvList, &client.ListOptions{Raw: &metav1.ListOptions{ResourceVersion: "0"}})
if err != nil {
return nil, err
}
Expand All @@ -96,7 +95,7 @@ func (v *logicVolumeGetter) GetByVolumeId(ctx context.Context, volumeID string)
return foundLv, nil
}

func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string) ([]*carinav1.LogicVolume, error) {
func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string, tryReader bool) ([]*carinav1.LogicVolume, error) {
lvList := new(carinav1.LogicVolumeList)
var lvs []*carinav1.LogicVolume
err := v.cache.List(ctx, lvList, client.MatchingFields{indexFiledNodeName: nodeName})
Expand All @@ -112,8 +111,12 @@ func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string)
return lvs, nil
}

if !tryReader {
return lvs, nil
}

// not found. try direct reader.
err = v.api.List(ctx, lvList)
err = v.api.List(ctx, lvList, &client.ListOptions{Raw: &metav1.ListOptions{ResourceVersion: "0"}})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,8 +160,6 @@ func NewLogicVolumeService(mgr manager.Manager) (*LogicVolumeService, error) {
// CreateVolume creates volume
func (s *LogicVolumeService) CreateVolume(ctx context.Context, namespace, pvc, node, deviceGroup, pvName string, requestGb int64, owner metav1.OwnerReference, annotation map[string]string) (string, uint32, uint32, error) {
log.Info("k8s.CreateVolume called name ", pvName, " node ", node, " deviceGroup ", deviceGroup, " size_gb ", requestGb)
s.mu.Lock()
defer s.mu.Unlock()

lv := &carinav1.LogicVolume{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -249,6 +250,10 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string)
return err
}

if !lv.GetDeletionTimestamp().IsZero() {
return errors.New("lv is being deleted")
}

err = s.Delete(ctx, lv)
if err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -257,6 +262,13 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string)
return err
}

// if the node doesn't exist, return directly
existingNode := new(corev1.Node)
err = s.getter.Get(ctx, client.ObjectKey{Name: lv.Spec.NodeName}, existingNode)
if err != nil {
return err
}

// wait until delete the target volume
for {
log.Info("waiting for delete LogicalVolume name ", lv.Name)
Expand All @@ -280,14 +292,19 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string)
// ExpandVolume expands volume
func (s *LogicVolumeService) ExpandVolume(ctx context.Context, volumeID string, requestGb int64) error {
log.Info("k8s.ExpandVolume called volumeID ", volumeID, " requestGb ", requestGb)
s.mu.Lock()
defer s.mu.Unlock()

lv, err := s.GetLogicVolumeByVolumeId(ctx, volumeID)
if err != nil {
return err
}

// if the node doesn't exist, return directly
existingNode := new(corev1.Node)
err = s.getter.Get(ctx, client.ObjectKey{Name: lv.Spec.NodeName}, existingNode)
if err != nil {
return err
}

err = s.UpdateLogicVolumeSpecSize(ctx, volumeID, resource.NewQuantity(requestGb<<30, resource.BinarySI))
if err != nil {
return err
Expand Down Expand Up @@ -331,8 +348,8 @@ func (s *LogicVolumeService) GetLogicVolumeByVolumeId(ctx context.Context, volum
}

// GetLogicVolumesByNodeName returns logicVolumes by node name.
func (s *LogicVolumeService) GetLogicVolumesByNodeName(ctx context.Context, nodeName string) ([]*carinav1.LogicVolume, error) {
return s.lvGetter.GetByNodeName(ctx, nodeName)
func (s *LogicVolumeService) GetLogicVolumesByNodeName(ctx context.Context, nodeName string, tryReader bool) ([]*carinav1.LogicVolume, error) {
return s.lvGetter.GetByNodeName(ctx, nodeName, tryReader)
}

// UpdateLogicVolumeCurrentSize UpdateCurrentSize updates .Status.CurrentSize of LogicVolume.
Expand Down
22 changes: 12 additions & 10 deletions pkg/csidriver/driver/k8s/node_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ import (
"context"
"errors"
"fmt"
"github.com/carina-io/carina"
carinav1beta1 "github.com/carina-io/carina/api/v1beta1"
"github.com/carina-io/carina/getter"
"github.com/carina-io/carina/pkg/configuration"
"github.com/carina-io/carina/pkg/csidriver/driver/util"
"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
"sort"
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sort"
"strings"

"github.com/carina-io/carina"
carinav1beta1 "github.com/carina-io/carina/api/v1beta1"
"github.com/carina-io/carina/getter"
"github.com/carina-io/carina/pkg/configuration"
"github.com/carina-io/carina/pkg/csidriver/driver/util"
"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
)

// This annotation is present on K8s 1.11 release.
Expand Down Expand Up @@ -71,7 +73,7 @@ func NewNodeService(mgr manager.Manager, lvService *LogicVolumeService) *NodeSer

func (n NodeService) getLvExclusivityDisks(ctx context.Context, nodeName string) ([]string, error) {
lvExclusivityDisks := []string{}
lvs, err := n.lvService.GetLogicVolumesByNodeName(ctx, nodeName)
lvs, err := n.lvService.GetLogicVolumesByNodeName(ctx, nodeName, true)
if err != nil {
return nil, status.Errorf(codes.Internal, "can not get logic volumes for nodeName %s", nodeName)
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/devicemanager/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func parseUdevInfo(output string) map[string]string {
}
return result
}

func (ld *LocalPartitionImplement) CreatePartition(name, groups string, size uint64) error {
partition, _ := ld.GetPartition(name, groups)
if partition.Name == name {
Expand Down Expand Up @@ -416,6 +417,7 @@ func parseDiskString(diskString string) []*types.LocalDisk {

diskString = strings.ReplaceAll(diskString, "\"", "")
//diskString = strings.ReplaceAll(diskString, " ", "")
parentDisk := map[string]int8{}

blksList := strings.Split(diskString, "\n")
for _, blks := range blksList {
Expand Down Expand Up @@ -447,6 +449,7 @@ func parseDiskString(diskString string) []*types.LocalDisk {
tmp.Filesystem = k[1]
case "PKNAME":
tmp.ParentName = k[1]
parentDisk[tmp.ParentName] = 1
case "MAJ:MIN":
tmp.DeviceNumber = k[1]
default:
Expand All @@ -456,8 +459,13 @@ func parseDiskString(diskString string) []*types.LocalDisk {

resp = append(resp, &tmp)
}
return resp

for _, res := range resp {
if _, ok := parentDisk[res.Name]; ok {
res.HavePartitions = true
}
}
return resp
}

func filter(disklist []*types.LocalDisk) (diskList []*types.LocalDisk) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/devicemanager/types/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ type LocalDisk struct {
ParentName string `json:"parentName"`
// Device number
DeviceNumber string `json:"deviceNumber"`
// Have partitions
HavePartitions bool `json:"havePartitions"`
}
6 changes: 4 additions & 2 deletions pkg/metrics/volume_stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package metrics
import (
"context"
"errors"
"github.com/carina-io/carina/pkg/csidriver/driver/k8s"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/procfs/blockdevice"

"github.com/carina-io/carina/pkg/csidriver/driver/k8s"
)

const (
Expand Down Expand Up @@ -138,7 +140,7 @@ func (v *volumeStatsCollector) Update(ch chan<- prometheus.Metric) error {
if err != nil {
return errors.New("couldn't get diskstats:" + err.Error())
}
logicVolumes, err := v.lvService.GetLogicVolumesByNodeName(context.Background(), nodeName)
logicVolumes, err := v.lvService.GetLogicVolumesByNodeName(context.Background(), nodeName, false)
if err != nil {
return err
}
Expand Down
13 changes: 5 additions & 8 deletions runners/devicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package runners

import (
"context"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
"k8s.io/apimachinery/pkg/api/equality"
"regexp"
"sigs.k8s.io/controller-runtime/pkg/manager"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/carina-io/carina/pkg/configuration"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
"github.com/carina-io/carina/pkg/devicemanager/types"
"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
Expand Down Expand Up @@ -218,10 +219,6 @@ func (dc *deviceCheck) discoverDisk(diskClass map[string]configuration.DiskSelec
return blockClass, nil
}

parentDisk := map[string]int8{}
for _, d := range localDisk {
parentDisk[d.ParentName] = 1
}
// If the disk has been added to a VG group, add it to this vg group
hasMatchedDisk := map[string]int8{}

Expand All @@ -238,7 +235,7 @@ func (dc *deviceCheck) discoverDisk(diskClass map[string]configuration.DiskSelec
// 过滤出空块设备
for _, d := range localDisk {
// 如果是其他磁盘Parent直接跳过
if _, ok := parentDisk[d.Name]; ok {
if d.HavePartitions {
continue
}

Expand Down
Loading
Loading