Skip to content

Commit

Permalink
Merge pull request #198 from fanhaouu/feat-optimize-logic
Browse files Browse the repository at this point in the history
optimize the code logic
  • Loading branch information
carina-ci-bot authored Sep 22, 2023
2 parents e99ac36 + 2bbefcc commit 4ad466b
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 57 deletions.
17 changes: 11 additions & 6 deletions controllers/logicvolume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@ package controllers
import (
"context"
"fmt"
"github.com/carina-io/carina"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
"sigs.k8s.io/controller-runtime/pkg/controller"
"strconv"
"time"

"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
"google.golang.org/grpc/codes"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/carina-io/carina"
carinav1 "github.com/carina-io/carina/api/v1"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
)

// LogicVolumeReconciler reconciles a LogicVolume object
Expand Down Expand Up @@ -106,6 +106,8 @@ func (r *LogicVolumeReconciler) SetupWithManager(mgr ctrl.Manager) error {

// operation lvm
func (r *LogicVolumeReconciler) removeLVIfExists(ctx context.Context, lv *carinav1.LogicVolume) error {
log.Info("Start to remove LV name ", lv.Name)

// Finalizer's process ( RemoveLV then removeString ) is not atomic,
// so checking existence of LV to ensure its idempotence
var err error
Expand Down Expand Up @@ -145,9 +147,10 @@ func (r *LogicVolumeReconciler) removeLVIfExists(ctx context.Context, lv *carina
}

func (r *LogicVolumeReconciler) createLV(ctx context.Context, lv *carinav1.LogicVolume) error {
log.Info("Start to create LV name ", lv.Name)

// When lv.Status.Code is not codes.OK (== 0), CreateLV has already failed.
// LogicalVolume CRD will be deleted soon by the controller.

if lv.Status.Code != codes.OK {
return nil
}
Expand Down Expand Up @@ -238,6 +241,8 @@ func (r *LogicVolumeReconciler) createLV(ctx context.Context, lv *carinav1.Logic
}

func (r *LogicVolumeReconciler) expandLV(ctx context.Context, lv *carinav1.LogicVolume) error {
log.Info("Start to expand LV name ", lv.Name)

// The reconciliation loop of LogicVolume may call expandLV before resizing is triggered.
// So, lv.Status.CurrentSize could be nil here.
if lv.Status.CurrentSize == nil {
Expand Down
51 changes: 34 additions & 17 deletions pkg/csidriver/driver/k8s/logicvolume_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ import (
"context"
"errors"
"fmt"
"github.com/carina-io/carina"
"github.com/carina-io/carina/getter"
"sync"
"time"

carinav1 "github.com/carina-io/carina/api/v1"
"github.com/carina-io/carina/utils/log"
"sigs.k8s.io/controller-runtime/pkg/manager"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/carina-io/carina"
carinav1 "github.com/carina-io/carina/api/v1"
"github.com/carina-io/carina/getter"
"github.com/carina-io/carina/utils/log"
)

// ErrVolumeNotFound represents the specified volume is not found.
Expand All @@ -45,7 +45,6 @@ type LogicVolumeService struct {
client.Client
getter *getter.RetryGetter
lvGetter *logicVolumeGetter
mu sync.Mutex
}

const (
Expand Down Expand Up @@ -74,7 +73,7 @@ func (v *logicVolumeGetter) GetByVolumeId(ctx context.Context, volumeID string)
}

// not found. try direct reader.
err = v.api.List(ctx, lvList)
err = v.api.List(ctx, lvList, &client.ListOptions{Raw: &metav1.ListOptions{ResourceVersion: "0"}})
if err != nil {
return nil, err
}
Expand All @@ -96,7 +95,7 @@ func (v *logicVolumeGetter) GetByVolumeId(ctx context.Context, volumeID string)
return foundLv, nil
}

func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string) ([]*carinav1.LogicVolume, error) {
func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string, tryReader bool) ([]*carinav1.LogicVolume, error) {
lvList := new(carinav1.LogicVolumeList)
var lvs []*carinav1.LogicVolume
err := v.cache.List(ctx, lvList, client.MatchingFields{indexFiledNodeName: nodeName})
Expand All @@ -112,8 +111,12 @@ func (v *logicVolumeGetter) GetByNodeName(ctx context.Context, nodeName string)
return lvs, nil
}

if !tryReader {
return lvs, nil
}

// not found. try direct reader.
err = v.api.List(ctx, lvList)
err = v.api.List(ctx, lvList, &client.ListOptions{Raw: &metav1.ListOptions{ResourceVersion: "0"}})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,8 +160,6 @@ func NewLogicVolumeService(mgr manager.Manager) (*LogicVolumeService, error) {
// CreateVolume creates volume
func (s *LogicVolumeService) CreateVolume(ctx context.Context, namespace, pvc, node, deviceGroup, pvName string, requestGb int64, owner metav1.OwnerReference, annotation map[string]string) (string, uint32, uint32, error) {
log.Info("k8s.CreateVolume called name ", pvName, " node ", node, " deviceGroup ", deviceGroup, " size_gb ", requestGb)
s.mu.Lock()
defer s.mu.Unlock()

lv := &carinav1.LogicVolume{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -249,6 +250,10 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string)
return err
}

if !lv.GetDeletionTimestamp().IsZero() {
return errors.New("lv is being deleted")
}

err = s.Delete(ctx, lv)
if err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -257,6 +262,13 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string)
return err
}

// if the node doesn't exist, return directly
existingNode := new(corev1.Node)
err = s.getter.Get(ctx, client.ObjectKey{Name: lv.Spec.NodeName}, existingNode)
if err != nil {
return err
}

// wait until delete the target volume
for {
log.Info("waiting for delete LogicalVolume name ", lv.Name)
Expand All @@ -280,14 +292,19 @@ func (s *LogicVolumeService) DeleteVolume(ctx context.Context, volumeID string)
// ExpandVolume expands volume
func (s *LogicVolumeService) ExpandVolume(ctx context.Context, volumeID string, requestGb int64) error {
log.Info("k8s.ExpandVolume called volumeID ", volumeID, " requestGb ", requestGb)
s.mu.Lock()
defer s.mu.Unlock()

lv, err := s.GetLogicVolumeByVolumeId(ctx, volumeID)
if err != nil {
return err
}

// if the node doesn't exist, return directly
existingNode := new(corev1.Node)
err = s.getter.Get(ctx, client.ObjectKey{Name: lv.Spec.NodeName}, existingNode)
if err != nil {
return err
}

err = s.UpdateLogicVolumeSpecSize(ctx, volumeID, resource.NewQuantity(requestGb<<30, resource.BinarySI))
if err != nil {
return err
Expand Down Expand Up @@ -331,8 +348,8 @@ func (s *LogicVolumeService) GetLogicVolumeByVolumeId(ctx context.Context, volum
}

// GetLogicVolumesByNodeName returns logicVolumes by node name.
func (s *LogicVolumeService) GetLogicVolumesByNodeName(ctx context.Context, nodeName string) ([]*carinav1.LogicVolume, error) {
return s.lvGetter.GetByNodeName(ctx, nodeName)
func (s *LogicVolumeService) GetLogicVolumesByNodeName(ctx context.Context, nodeName string, tryReader bool) ([]*carinav1.LogicVolume, error) {
return s.lvGetter.GetByNodeName(ctx, nodeName, tryReader)
}

// UpdateLogicVolumeCurrentSize UpdateCurrentSize updates .Status.CurrentSize of LogicVolume.
Expand Down
22 changes: 12 additions & 10 deletions pkg/csidriver/driver/k8s/node_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ import (
"context"
"errors"
"fmt"
"github.com/carina-io/carina"
carinav1beta1 "github.com/carina-io/carina/api/v1beta1"
"github.com/carina-io/carina/getter"
"github.com/carina-io/carina/pkg/configuration"
"github.com/carina-io/carina/pkg/csidriver/driver/util"
"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
"sort"
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sort"
"strings"

"github.com/carina-io/carina"
carinav1beta1 "github.com/carina-io/carina/api/v1beta1"
"github.com/carina-io/carina/getter"
"github.com/carina-io/carina/pkg/configuration"
"github.com/carina-io/carina/pkg/csidriver/driver/util"
"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
)

// This annotation is present on K8s 1.11 release.
Expand Down Expand Up @@ -71,7 +73,7 @@ func NewNodeService(mgr manager.Manager, lvService *LogicVolumeService) *NodeSer

func (n NodeService) getLvExclusivityDisks(ctx context.Context, nodeName string) ([]string, error) {
lvExclusivityDisks := []string{}
lvs, err := n.lvService.GetLogicVolumesByNodeName(ctx, nodeName)
lvs, err := n.lvService.GetLogicVolumesByNodeName(ctx, nodeName, true)
if err != nil {
return nil, status.Errorf(codes.Internal, "can not get logic volumes for nodeName %s", nodeName)
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/devicemanager/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func parseUdevInfo(output string) map[string]string {
}
return result
}

func (ld *LocalPartitionImplement) CreatePartition(name, groups string, size uint64) error {
partition, _ := ld.GetPartition(name, groups)
if partition.Name == name {
Expand Down Expand Up @@ -416,6 +417,7 @@ func parseDiskString(diskString string) []*types.LocalDisk {

diskString = strings.ReplaceAll(diskString, "\"", "")
//diskString = strings.ReplaceAll(diskString, " ", "")
parentDisk := map[string]int8{}

blksList := strings.Split(diskString, "\n")
for _, blks := range blksList {
Expand Down Expand Up @@ -447,6 +449,7 @@ func parseDiskString(diskString string) []*types.LocalDisk {
tmp.Filesystem = k[1]
case "PKNAME":
tmp.ParentName = k[1]
parentDisk[tmp.ParentName] = 1
case "MAJ:MIN":
tmp.DeviceNumber = k[1]
default:
Expand All @@ -456,8 +459,13 @@ func parseDiskString(diskString string) []*types.LocalDisk {

resp = append(resp, &tmp)
}
return resp

for _, res := range resp {
if _, ok := parentDisk[res.Name]; ok {
res.HavePartitions = true
}
}
return resp
}

func filter(disklist []*types.LocalDisk) (diskList []*types.LocalDisk) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/devicemanager/types/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ type LocalDisk struct {
ParentName string `json:"parentName"`
// Device number
DeviceNumber string `json:"deviceNumber"`
// Have partitions
HavePartitions bool `json:"havePartitions"`
}
6 changes: 4 additions & 2 deletions pkg/metrics/volume_stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package metrics
import (
"context"
"errors"
"github.com/carina-io/carina/pkg/csidriver/driver/k8s"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/procfs/blockdevice"

"github.com/carina-io/carina/pkg/csidriver/driver/k8s"
)

const (
Expand Down Expand Up @@ -138,7 +140,7 @@ func (v *volumeStatsCollector) Update(ch chan<- prometheus.Metric) error {
if err != nil {
return errors.New("couldn't get diskstats:" + err.Error())
}
logicVolumes, err := v.lvService.GetLogicVolumesByNodeName(context.Background(), nodeName)
logicVolumes, err := v.lvService.GetLogicVolumesByNodeName(context.Background(), nodeName, false)
if err != nil {
return err
}
Expand Down
13 changes: 5 additions & 8 deletions runners/devicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package runners

import (
"context"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
"k8s.io/apimachinery/pkg/api/equality"
"regexp"
"sigs.k8s.io/controller-runtime/pkg/manager"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/carina-io/carina/pkg/configuration"
deviceManager "github.com/carina-io/carina/pkg/devicemanager"
"github.com/carina-io/carina/pkg/devicemanager/types"
"github.com/carina-io/carina/utils"
"github.com/carina-io/carina/utils/log"
Expand Down Expand Up @@ -218,10 +219,6 @@ func (dc *deviceCheck) discoverDisk(diskClass map[string]configuration.DiskSelec
return blockClass, nil
}

parentDisk := map[string]int8{}
for _, d := range localDisk {
parentDisk[d.ParentName] = 1
}
// If the disk has been added to a VG group, add it to this vg group
hasMatchedDisk := map[string]int8{}

Expand All @@ -238,7 +235,7 @@ func (dc *deviceCheck) discoverDisk(diskClass map[string]configuration.DiskSelec
// 过滤出空块设备
for _, d := range localDisk {
// 如果是其他磁盘Parent直接跳过
if _, ok := parentDisk[d.Name]; ok {
if d.HavePartitions {
continue
}

Expand Down
Loading

0 comments on commit 4ad466b

Please sign in to comment.