Skip to content

Commit

Permalink
ref: Respect idempotency in CreateVolume(...) (#142)
Browse files Browse the repository at this point in the history
* Respect idempotency in CreateVolume(...)

* %w -> %s

* Extend the polling timeout for cloned volumes
  • Loading branch information
lgarber-akamai authored Dec 11, 2023
1 parent 4fa9814 commit 2d8467e
Showing 1 changed file with 86 additions and 76 deletions.
162 changes: 86 additions & 76 deletions pkg/linode-bs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
driverName = "linodebs.csi.linode.com"
devicePathKey = "devicePath"
waitTimeout = 300
cloneReadinessTimeout = 900
minProviderVolumeBytes = 10 * gigabyte

// VolumeTags is a comma seperated string used to pass information to the linode APIs to tag the
Expand Down Expand Up @@ -76,14 +77,6 @@ func (linodeCS *LinodeControllerServer) CreateVolume(ctx context.Context, req *c
return nil, status.Error(codes.Internal, err.Error())
}

// Attempt to get info about the source volume.
// sourceVolumeInfo will be null if no content source is defined.
contentSource := req.GetVolumeContentSource()
sourceVolumeInfo, err := linodeCS.attemptGetContentSourceVolume(ctx, contentSource)
if err != nil {
return nil, err
}

// to avoid mangled requests for existing volumes with hyphen,
// we only strip them out on creation when k8s invented the name
// this is still problematic because we strip "-" from volume-name-prefixes
Expand All @@ -101,16 +94,6 @@ func (linodeCS *LinodeControllerServer) CreateVolume(ctx context.Context, req *c
"volume_name": volumeName,
})

jsonFilter, err := json.Marshal(map[string]string{"label": volumeName})
if err != nil {
return nil, err
}

volumes, err := linodeCS.CloudProvider.ListVolumes(ctx, linodego.NewListOptions(0, string(jsonFilter)))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

volumeContext := make(map[string]string)
if req.Parameters[LuksEncryptedAttribute] == "true" {
// if luks encryption is enabled add a volume context
Expand All @@ -120,47 +103,54 @@ func (linodeCS *LinodeControllerServer) CreateVolume(ctx context.Context, req *c
volumeContext[LuksKeySizeAttribute] = req.Parameters[LuksKeySizeAttribute]
}

tags := req.Parameters[VolumeTags]

if len(volumes) != 0 {
if len(volumes) > 1 {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("duplicate volume %q exists", volumeName))
}
volume := volumes[0]
if int64(volume.Size*gigabyte) != size {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("invalid option requested size: %d", size))
}
targetSizeGB := int(size / gigabyte)

key := common.CreateLinodeVolumeKey(volume.ID, volume.Label)
// Attempt to get info about the source volume.
// sourceVolumeInfo will be null if no content source is defined.
contentSource := req.GetVolumeContentSource()
sourceVolumeInfo, err := linodeCS.attemptGetContentSourceVolume(
ctx,
contentSource,
)
if err != nil {
return nil, err
}

klog.V(4).Info("volume already created")
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: key.GetVolumeKey(),
CapacityBytes: int64(volume.Size * gigabyte),
VolumeContext: volumeContext,
},
}, nil
// Attempt to create the volume while respecting idempotency
vol, err := linodeCS.attemptCreateLinodeVolume(
ctx,
volumeName,
targetSizeGB,
req.Parameters[VolumeTags],
sourceVolumeInfo,
)
if err != nil {
return nil, err
}

var vol *linodego.Volume
volumeSizeGB := int(size / gigabyte)
// Attempt to resize the volume if necessary
if vol.Size != targetSizeGB {
klog.V(4).Infoln("resizing volume", map[string]interface{}{
"volume_id": vol.ID,
"old_size": vol.Size,
"new_size": targetSizeGB,
})

if sourceVolumeInfo != nil {
// Clone the volume
vol, err = linodeCS.cloneLinodeVolume(ctx, volumeName, volumeSizeGB, sourceVolumeInfo.VolumeID)
} else {
// Create the volume from scratch
vol, err = linodeCS.createLinodeVolume(ctx, volumeName, volumeSizeGB, tags)
if err := linodeCS.CloudProvider.ResizeVolume(ctx, vol.ID, targetSizeGB); err != nil {
return nil, status.Errorf(codes.Internal, "failed to resize cloned volume (%d): %s", targetSizeGB, err)
}
}

// Error handling for the above function calls
if err != nil {
return nil, err
statusPollTimeout := waitTimeout

// If we're cloning the volume we should extend the timeout
if sourceVolumeInfo != nil {
statusPollTimeout = cloneReadinessTimeout
}

if err != nil {
return nil, status.Error(codes.Internal, err.Error())
if _, err := linodeCS.CloudProvider.WaitForVolumeStatus(
ctx, vol.ID, linodego.VolumeActive, statusPollTimeout); err != nil {
return nil, status.Errorf(codes.Internal, "failed to wait for volume (%d) active: %s", vol.ID, err)
}

klog.V(4).Infoln("volume active", map[string]interface{}{"vol": vol})
Expand Down Expand Up @@ -192,7 +182,7 @@ func (linodeCS *LinodeControllerServer) CreateVolume(ctx context.Context, req *c
}
}

klog.V(4).Infoln("volume created", map[string]interface{}{"response": resp})
klog.V(4).Infoln("volume finished creation", map[string]interface{}{"response": resp})
return resp, nil
}

Expand Down Expand Up @@ -564,6 +554,43 @@ func (linodeCS *LinodeControllerServer) attemptGetContentSourceVolume(
return volumeInfo, nil
}

// attemptCreateLinodeVolume attempts to create a volume while respecting
// idempotency.
func (linodeCS *LinodeControllerServer) attemptCreateLinodeVolume(
ctx context.Context,
label string,
sizeGB int,
tags string,
sourceVolume *common.LinodeVolumeKey,
) (*linodego.Volume, error) {
// List existing volumes
jsonFilter, err := json.Marshal(map[string]string{"label": label})
if err != nil {
return nil, err
}

volumes, err := linodeCS.CloudProvider.ListVolumes(ctx, linodego.NewListOptions(0, string(jsonFilter)))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// This shouldn't happen, but raise an error just in case
if len(volumes) > 1 {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("duplicate volume %q exists", label))
}

// Volume already exists
if len(volumes) == 1 {
return &volumes[0], nil
}

if sourceVolume != nil {
return linodeCS.cloneLinodeVolume(ctx, label, sizeGB, sourceVolume.VolumeID)
}

return linodeCS.createLinodeVolume(ctx, label, sizeGB, tags)
}

// createLinodeVolume creates a Linode volume and returns the result
func (linodeCS *LinodeControllerServer) createLinodeVolume(
ctx context.Context, label string, sizeGB int, tags string) (*linodego.Volume, error) {
Expand All @@ -582,13 +609,10 @@ func (linodeCS *LinodeControllerServer) createLinodeVolume(

result, err := linodeCS.CloudProvider.CreateVolume(ctx, volumeReq)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if _, err := linodeCS.CloudProvider.WaitForVolumeStatus(
ctx, result.ID, linodego.VolumeActive, waitTimeout); err != nil {
return nil, status.Error(
codes.Internal, fmt.Sprintf("failed to wait for fresh volume to be active: %s", err))
return nil, status.Errorf(
codes.Internal,
"failed to create linode volume: %s", err,
)
}

return result, nil
Expand All @@ -603,24 +627,10 @@ func (linodeCS *LinodeControllerServer) cloneLinodeVolume(

result, err := linodeCS.CloudProvider.CloneVolume(ctx, sourceID, label)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if _, err := linodeCS.CloudProvider.WaitForVolumeStatus(
ctx, result.ID, linodego.VolumeActive, waitTimeout); err != nil {
return nil, status.Errorf(codes.Internal, "failed to wait for cloned volume (%d) active: %s", result.ID, err)
}

if result.Size != sizeGB {
klog.V(4).Infoln("resizing volume", map[string]interface{}{
"volume_id": result.ID,
"old_size": result.Size,
"new_size": sizeGB,
})

if err := linodeCS.CloudProvider.ResizeVolume(ctx, result.ID, sizeGB); err != nil {
return nil, status.Errorf(codes.Internal, "failed to resize cloned volume (%d): %s", result.ID, err)
}
return nil, status.Errorf(
codes.Internal,
"failed to clone linode volume %d into new volume: %s", sourceID, err,
)
}

return result, nil
Expand Down

0 comments on commit 2d8467e

Please sign in to comment.