Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Provide hostname for zonal disk insert API #1849

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"strings"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"

Expand Down Expand Up @@ -72,6 +74,7 @@ var (
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")
fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
enableVMLocationHint = flag.Bool("enable-vm-location-hint", false, "If set to true, the location hint field for create volume request will have hostname set")

multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
multiZoneVolumeHandleEnableFlag = flag.Bool("multi-zone-volume-handle-enable", false, "If set to true, the multi-zone volumeHandle feature will be enabled")
Expand Down Expand Up @@ -244,7 +247,20 @@ func handle() {
}
}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)
var kubeClient *kubernetes.Clientset
if *enableVMLocationHint {
cfg, err := rest.InClusterConfig()
if err != nil {
klog.Fatalf("Could not fetch in-cluster config: %v", err.Error())
}

kubeClient, err = kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Could not fetch in-cluster client: %v", err.Error())
}
}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, kubeClient, identityServer, controllerServer, nodeServer)
if err != nil {
klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZMPRZwes7CROmyNKgQzC3XPs6L/G2EJLHddWejkmf4=
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
Expand Down
7 changes: 6 additions & 1 deletion pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (cloud *FakeCloudProvider) ValidateExistingDisk(ctx context.Context, resp *
return ValidateDiskParameters(resp, params)
}

func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error {
func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) error {
if disk, ok := cloud.disks[volKey.String()]; ok {
err := cloud.ValidateExistingDisk(ctx, disk, params,
int64(capacityRange.GetRequiredBytes()),
Expand All @@ -242,6 +242,7 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string,
Labels: params.Labels,
ProvisionedIops: params.ProvisionedIOPSOnCreate,
ProvisionedThroughput: params.ProvisionedThroughputOnCreate,
LocationHint: cloud.GetLocationHintURI(project, volKey.Zone, hostName),
}

if snapshotID != "" {
Expand Down Expand Up @@ -382,6 +383,10 @@ func (cloud *FakeCloudProvider) GetDiskTypeURI(project string, volKey *meta.Key,
}
}

func (cloud *FakeCloudProvider) GetLocationHintURI(project, zone, hostName string) string {
return fmt.Sprintf(locationHintURITemplate, project, zone, hostName)
}

func (cloud *FakeCloudProvider) getZonalDiskTypeURI(project, zone, diskType string) string {
return fmt.Sprintf(diskTypeURITemplateSingleZone, project, zone, diskType)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type GCECompute interface {
GetDisk(ctx context.Context, project string, volumeKey *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error)
RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error)
ValidateExistingDisk(ctx context.Context, disk *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) error
DeleteDisk(ctx context.Context, project string, volumeKey *meta.Key) error
UpdateDisk(ctx context.Context, project string, volKey *meta.Key, existingDisk *CloudDisk, params common.ModifyVolumeParameters) error
AttachDisk(ctx context.Context, project string, volKey *meta.Key, readWrite, diskType, instanceZone, instanceName string, forceAttach bool) error
Expand Down Expand Up @@ -436,7 +436,7 @@ func ValidateDiskParameters(disk *CloudDisk, params common.DiskParameters) error
return nil
}

func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error {
func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) error {
klog.V(5).Infof("Inserting disk %v", volKey)

description, err := encodeTags(params.Tags)
Expand All @@ -449,7 +449,7 @@ func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volK
if description == "" {
description = "Disk created by GCE-PD CSI Driver"
}
return cloud.insertZonalDisk(ctx, project, volKey, params, capBytes, capacityRange, snapshotID, volumeContentSourceVolumeID, description, multiWriter, accessMode)
return cloud.insertZonalDisk(ctx, project, volKey, params, capBytes, capacityRange, snapshotID, volumeContentSourceVolumeID, description, multiWriter, accessMode, hostName)
case meta.Regional:
if description == "" {
description = "Regional disk created by GCE-PD CSI Driver"
Expand Down Expand Up @@ -772,7 +772,7 @@ func (cloud *CloudProvider) insertZonalDisk(
volumeContentSourceVolumeID string,
description string,
multiWriter bool,
accessMode string) error {
accessMode, hostName string) error {
var (
err error
opName string
Expand All @@ -788,6 +788,7 @@ func (cloud *CloudProvider) insertZonalDisk(
Description: description,
Type: cloud.GetDiskTypeURI(project, volKey, params.DiskType),
Labels: params.Labels,
LocationHint: cloud.GetLocationHintURI(project, volKey.Zone, hostName),
}

if params.ProvisionedIOPSOnCreate > 0 {
Expand Down Expand Up @@ -1107,6 +1108,10 @@ func (cloud *CloudProvider) GetDiskTypeURI(project string, volKey *meta.Key, dis
}
}

func (cloud *CloudProvider) GetLocationHintURI(project, zone, hostName string) string {
return fmt.Sprintf(locationHintURITemplate, project, zone, hostName)
}

func (cloud *CloudProvider) getZonalDiskTypeURI(project string, zone, diskType string) string {
return cloud.service.BasePath + fmt.Sprintf(diskTypeURITemplateSingleZone, project, zone, diskType)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/gce-cloud-provider/compute/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
diskTypeURITemplateRegional = "projects/%s/regions/%s/diskTypes/%s" // {gce.projectID}/regions/{disk.Region}/diskTypes/{disk.Type}"

regionURITemplate = "projects/%s/regions/%s"
locationHintURITemplate = "projects/%s/zone/%s/instance/%s"

replicaZoneURITemplateSingleZone = "projects/%s/zones/%s" // {gce.projectID}/zones/{disk.Zone}
EnvironmentStaging Environment = "staging"
Expand Down
37 changes: 31 additions & 6 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
Expand Down Expand Up @@ -208,6 +210,8 @@ const (
listDisksUsersField = googleapi.Field("items/users")

readOnlyManyAccessMode = "READ_ONLY_MANY"

annSelectedNode = "volume.kubernetes.io/selected-node"
)

var (
Expand Down Expand Up @@ -720,21 +724,23 @@ func (gceCS *GCEControllerServer) createSingleDisk(ctx context.Context, req *csi
// Create the disk
var disk *gce.CloudDisk
name := req.GetName()
pvcName, pvcNamespace := req.Parameters[common.ParameterKeyPVCName], req.Parameters[common.ParameterKeyPVCNamespace]
hostName := getHostNameFromPVC(ctx, pvcName, pvcNamespace, gceCS.Driver.kubeClient)

switch params.ReplicationType {
case replicationTypeNone:
if len(zones) != 1 {
return nil, status.Errorf(codes.Internal, "CreateVolume failed to get a single zone for creating zonal disk, instead got: %v", zones)
}
disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode)
disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode, hostName)
if err != nil {
return nil, common.LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err)
}
case replicationTypeRegionalPD:
if len(zones) != 2 {
return nil, status.Errorf(codes.Internal, "CreateVolume failed to get a 2 zones for creating regional disk, instead got: %v", zones)
}
disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode)
disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode, "")
if err != nil {
return nil, common.LoggedError("CreateVolume failed to create regional disk "+name+": ", err)
}
Expand Down Expand Up @@ -872,6 +878,25 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
return gceCS.deleteSingleDeviceDisk(ctx, req, project, volKey)
}

func getHostNameFromPVC(ctx context.Context, pvcName, pvcNamespace string, client kubernetes.Interface) string {
if client == nil {
// Client is initialized only when location hint is enabled
return ""
}

pvc, err := client.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get persistent volume claim %s: %v", pvcName, err)
// Don't return the error as create volume request can be continued without hostname
return ""
}
if hostName, ok := pvc.Annotations[annSelectedNode]; ok {
klog.V(4).Infof("Retrieved hostname %q from PVC %v", hostName, pvcName)
return hostName
Rishita-Golla marked this conversation as resolved.
Show resolved Hide resolved
}
return ""
}

func getGCEApiVersion(multiWriter bool) gce.GCEAPIVersion {
if multiWriter {
return gce.GCEAPIVersionBeta
Expand Down Expand Up @@ -2482,7 +2507,7 @@ func getResourceId(resourceLink string) (string, error) {
return strings.Join(elts[3:], "/"), nil
}

func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) (*gce.CloudDisk, error) {
func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) (*gce.CloudDisk, error) {
project := cloudProvider.GetDefaultProject()
region, err := common.GetRegionFromZones(zones)
if err != nil {
Expand All @@ -2495,7 +2520,7 @@ func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name
fullyQualifiedReplicaZones, cloudProvider.GetReplicaZoneURI(project, replicaZone))
}

err = cloudProvider.InsertDisk(ctx, project, meta.RegionalKey(name, region), params, capBytes, capacityRange, fullyQualifiedReplicaZones, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode)
err = cloudProvider.InsertDisk(ctx, project, meta.RegionalKey(name, region), params, capBytes, capacityRange, fullyQualifiedReplicaZones, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode, hostName)
if err != nil {
return nil, fmt.Errorf("failed to insert regional disk: %w", err)
}
Expand All @@ -2512,13 +2537,13 @@ func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name
return disk, nil
}

func createSingleZoneDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) (*gce.CloudDisk, error) {
func createSingleZoneDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) (*gce.CloudDisk, error) {
project := cloudProvider.GetDefaultProject()
if len(zones) != 1 {
return nil, fmt.Errorf("got wrong number of zones for zonal create volume: %v", len(zones))
}
diskZone := zones[0]
err := cloudProvider.InsertDisk(ctx, project, meta.ZonalKey(name, diskZone), params, capBytes, capacityRange, nil, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode)
err := cloudProvider.InsertDisk(ctx, project, meta.ZonalKey(name, diskZone), params, capBytes, capacityRange, nil, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode, hostName)
if err != nil {
return nil, fmt.Errorf("failed to insert zonal disk: %w", err)
}
Expand Down
Loading