From 6a5d8b01ecc0f8d83890899c827b2a13939d3eee Mon Sep 17 00:00:00 2001 From: Kristina Solovyova Date: Tue, 16 Jan 2024 11:05:20 +0200 Subject: [PATCH] feat: refactor scale_up, consider vmss refresh mode in other funcs --- README.md | 3 +- blob.tf | 5 +- function-app/code/common/common.go | 11 + function-app/code/common/vmss_config.go | 13 +- .../code/functions/clusterize/clusterize.go | 17 +- .../join_finalization/join_finalization.go | 4 + .../code/functions/protect/protect.go | 4 + function-app/code/functions/resize/resize.go | 5 + .../code/functions/scale_up/scale_up.go | 267 ++++++++++-------- function-app/code/functions/status/status.go | 4 +- functions.tf | 1 + variables.tf | 13 +- 12 files changed, 196 insertions(+), 151 deletions(-) diff --git a/README.md b/README.md index c22f9f46..279604d6 100644 --- a/README.md +++ b/README.md @@ -408,7 +408,7 @@ proxy_url = VALUE | [function\_app\_storage\_account\_prefix](#input\_function\_app\_storage\_account\_prefix) | Weka storage account name prefix | `string` | `"weka"` | no | | [function\_app\_subnet\_delegation\_cidr](#input\_function\_app\_subnet\_delegation\_cidr) | Subnet delegation enables you to designate a specific subnet for an Azure PaaS service. | `string` | `"10.0.1.0/25"` | no | | [function\_app\_subnet\_delegation\_id](#input\_function\_app\_subnet\_delegation\_id) | Required to specify if subnet\_name were used to specify pre-defined subnets for weka. Function subnet delegation requires an additional subnet, and in the case of pre-defined networking this one also should be pre-defined | `string` | `""` | no | -| [function\_app\_version](#input\_function\_app\_version) | Function app code version (hash) | `string` | `"60ec8c21cc5fd2b21d698fa9bb9e69ac"` | no | +| [function\_app\_version](#input\_function\_app\_version) | Function app code version (hash) | `string` | `"93a261d4128c2c3aa7809fecc4b94746"` | no | | [get\_weka\_io\_token](#input\_get\_weka\_io\_token) | The token to download the Weka release from get.weka.io. | `string` | `""` | no | | [hotspare](#input\_hotspare) | Number of hotspares to set on weka cluster. Refer to https://docs.weka.io/overview/ssd-capacity-management#hot-spare | `number` | `1` | no | | [install\_cluster\_dpdk](#input\_install\_cluster\_dpdk) | Install weka cluster with DPDK | `bool` | `true` | no | @@ -459,6 +459,7 @@ proxy_url = VALUE | [tiering\_obs\_name](#input\_tiering\_obs\_name) | Name of existing obs storage account | `string` | `""` | no | | [traces\_per\_ionode](#input\_traces\_per\_ionode) | The number of traces per ionode. Traces are low-level events generated by Weka processes and are used as troubleshooting information for support purposes. | `number` | `10` | no | | [vm\_username](#input\_vm\_username) | Provided as part of output for automated use of terraform, in case of custom AMI and automated use of outputs replace this with user that should be used for ssh connection | `string` | `"weka"` | no | +| [vmss\_instances\_adding\_step](#input\_vmss\_instances\_adding\_step) | Number of instances to add to vmss in one iteration | `number` | `3` | no | | [vmss\_single\_placement\_group](#input\_vmss\_single\_placement\_group) | Sets single\_placement\_group option for vmss. If true, a scale set is composed of a single placement group, and has a range of 0-100 VMs. | `bool` | `true` | no | | [vnet\_name](#input\_vnet\_name) | The virtual network name. | `string` | `""` | no | | [vnet\_rg\_name](#input\_vnet\_rg\_name) | Resource group name of vnet. Will be used when vnet\_name is not provided. | `string` | `""` | no | diff --git a/blob.tf b/blob.tf index d8600a2c..5cf86f3f 100644 --- a/blob.tf +++ b/blob.tf @@ -52,10 +52,9 @@ resource "azurerm_storage_blob" "vmss_state" { type = "Block" source_content = jsonencode({ - vmss_created = false vmss_version = 0 - refresh_status = 0 - current_config = {} + refresh_status = "none" + current_config = null }) lifecycle { diff --git a/function-app/code/common/common.go b/function-app/code/common/common.go index c97d0aec..e13adcdf 100644 --- a/function-app/code/common/common.go +++ b/function-app/code/common/common.go @@ -748,6 +748,17 @@ func getScaleSet(ctx context.Context, subscriptionId, resourceGroupName, vmScale return &scaleSet.VirtualMachineScaleSet, nil } +func GetScaleSetOrNilOnNotFound(ctx context.Context, subscriptionId, resourceGroupName, vmScaleSetName string) (*armcompute.VirtualMachineScaleSet, error) { + vmss, err := getScaleSet(ctx, subscriptionId, resourceGroupName, vmScaleSetName) + if err != nil { + if getErr, ok := err.(*azcore.ResponseError); ok && getErr.ErrorCode == "ResourceNotFound" { + return nil, nil + } + return nil, err + } + return vmss, nil +} + // Gets single scale set info func GetScaleSetInfo(ctx context.Context, subscriptionId, resourceGroupName, vmScaleSetName, keyVaultUri string) (*ScaleSetInfo, error) { logger := logging.LoggerFromCtx(ctx) diff --git a/function-app/code/common/vmss_config.go b/function-app/code/common/vmss_config.go index 814067a2..08386026 100644 --- a/function-app/code/common/vmss_config.go +++ b/function-app/code/common/vmss_config.go @@ -95,20 +95,15 @@ func VmssConfigsDiff(old, new *VMSSConfig) string { return cmp.Diff(new, old) // arguments order: (want, got) } -type RefreshStatus uint8 +type RefreshStatus string const ( - RefreshNone RefreshStatus = iota - RefreshInProgress RefreshStatus = iota - RefreshNeeded RefreshStatus = iota + RefreshNone RefreshStatus = "none" + RefreshInProgress RefreshStatus = "in_progress" + RefreshNeeded RefreshStatus = "needed" ) -func (s *RefreshStatus) String() string { - return []string{"none", "in_progress", "needed"}[*s] -} - type VMSSState struct { - VmssCreated bool `json:"vmss_created"` VmssVersion uint16 `json:"vmss_version"` RefreshStatus RefreshStatus `json:"refresh_status"` CurrentConfig *VMSSConfig `json:"current_config,omitempty"` diff --git a/function-app/code/functions/clusterize/clusterize.go b/function-app/code/functions/clusterize/clusterize.go index 504780e7..9e679016 100644 --- a/function-app/code/functions/clusterize/clusterize.go +++ b/function-app/code/functions/clusterize/clusterize.go @@ -58,7 +58,6 @@ type ClusterizationParams struct { Location string Prefix string KeyVaultUri string - VmssVersion uint16 StateContainerName string StateStorageName string @@ -97,7 +96,8 @@ func HandleLastClusterVm(ctx context.Context, state protocol.ClusterState, p Clu logger := logging.LoggerFromCtx(ctx) logger.Info().Msg("This is the last instance in the cluster, creating obs and clusterization script") - vmScaleSetName := common.GetVmScaleSetName(p.Prefix, p.Cluster.ClusterName, p.VmssVersion) + version := 0 // on cluserization step we are sure that the vmss version is 0 as no refresh was done yet + vmScaleSetName := common.GetVmScaleSetName(p.Prefix, p.Cluster.ClusterName, uint16(version)) if p.Cluster.SetObs { if p.Obs.AccessKey == "" { @@ -178,7 +178,8 @@ func Clusterize(ctx context.Context, p ClusterizationParams) (clusterizeScript s instanceName := strings.Split(p.VmName, ":")[0] instanceId := common.GetScaleSetVmIndex(instanceName) - vmScaleSetName := common.GetVmScaleSetName(p.Prefix, p.Cluster.ClusterName, p.VmssVersion) + version := 0 // on cluserization step we are sure that the vmss version is 0 as no refresh was done yet + vmScaleSetName := common.GetVmScaleSetName(p.Prefix, p.Cluster.ClusterName, uint16(version)) vmName := p.VmName ip, err := common.GetPublicIp(ctx, p.SubscriptionId, p.ResourceGroupName, vmScaleSetName, p.Prefix, p.Cluster.ClusterName, instanceId) @@ -227,7 +228,6 @@ func Clusterize(ctx context.Context, p ClusterizationParams) (clusterizeScript s func Handler(w http.ResponseWriter, r *http.Request) { stateContainerName := os.Getenv("STATE_CONTAINER_NAME") stateStorageName := os.Getenv("STATE_STORAGE_NAME") - vmssStateStorageName := os.Getenv("VMSS_STATE_STORAGE_NAME") hostsNum, _ := strconv.Atoi(os.Getenv("HOSTS_NUM")) clusterName := os.Getenv("CLUSTER_NAME") subscriptionId := os.Getenv("SUBSCRIPTION_ID") @@ -284,21 +284,12 @@ func Handler(w http.ResponseWriter, r *http.Request) { return } - vmssState, err := common.ReadVmssState(ctx, vmssStateStorageName, stateContainerName) - if err != nil { - err = fmt.Errorf("cannot read vmss state to read get vmss version: %v", err) - logger.Error().Err(err).Send() - common.WriteErrorResponse(w, err) - return - } - params := ClusterizationParams{ SubscriptionId: subscriptionId, ResourceGroupName: resourceGroupName, Location: location, Prefix: prefix, KeyVaultUri: keyVaultUri, - VmssVersion: vmssState.VmssVersion, StateContainerName: stateContainerName, StateStorageName: stateStorageName, VmName: data.Vm, diff --git a/function-app/code/functions/join_finalization/join_finalization.go b/function-app/code/functions/join_finalization/join_finalization.go index 7908ef75..2e78ac88 100644 --- a/function-app/code/functions/join_finalization/join_finalization.go +++ b/function-app/code/functions/join_finalization/join_finalization.go @@ -60,6 +60,10 @@ func Handler(w http.ResponseWriter, r *http.Request) { } vmScaleSetName := common.GetVmScaleSetName(prefix, clusterName, vmssState.VmssVersion) + // use the refresh vmss name if the cluster is in refresh mode + if vmssState.RefreshStatus != common.RefreshNone { + vmScaleSetName = common.GetRefreshVmssName(vmScaleSetName, vmssState.VmssVersion) + } err = common.SetDeletionProtection(ctx, subscriptionId, resourceGroupName, vmScaleSetName, common.GetScaleSetVmIndex(data.Name), true) if err != nil { diff --git a/function-app/code/functions/protect/protect.go b/function-app/code/functions/protect/protect.go index 9ebdcd04..3ab676af 100644 --- a/function-app/code/functions/protect/protect.go +++ b/function-app/code/functions/protect/protect.go @@ -63,6 +63,10 @@ func Handler(w http.ResponseWriter, r *http.Request) { } vmScaleSetName := common.GetVmScaleSetName(prefix, clusterName, vmssState.VmssVersion) + // use the refresh vmss name if the cluster is in refresh mode + if vmssState.RefreshStatus != common.RefreshNone { + vmScaleSetName = common.GetRefreshVmssName(vmScaleSetName, vmssState.VmssVersion) + } instanceName := strings.Split(data.Vm, ":")[0] hostName := strings.Split(data.Vm, ":")[1] diff --git a/function-app/code/functions/resize/resize.go b/function-app/code/functions/resize/resize.go index 15a69155..4d00b870 100644 --- a/function-app/code/functions/resize/resize.go +++ b/function-app/code/functions/resize/resize.go @@ -77,6 +77,11 @@ func Handler(w http.ResponseWriter, r *http.Request) { } vmssName := common.GetVmScaleSetName(prefix, clusterName, vmssState.VmssVersion) + // use the refresh vmss name if the cluster is in refresh mode + if vmssState.RefreshStatus != common.RefreshNone { + vmssName = common.GetRefreshVmssName(vmssName, vmssState.VmssVersion) + } + err = updateDesiredClusterSize(ctx, *size.Value, subscriptionId, resourceGroupName, vmssName, stateContainerName, stateStorageName) if err != nil { logger.Error().Err(err).Send() diff --git a/function-app/code/functions/scale_up/scale_up.go b/function-app/code/functions/scale_up/scale_up.go index d787d68e..7c98c4f6 100644 --- a/function-app/code/functions/scale_up/scale_up.go +++ b/function-app/code/functions/scale_up/scale_up.go @@ -5,37 +5,49 @@ import ( "fmt" "net/http" "os" + "strconv" "weka-deployment/common" "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5" "github.com/weka/go-cloud-lib/logging" + "github.com/weka/go-cloud-lib/protocol" ) -const RefreshVmssInstancesAddingStep = 10 - -type ScaleUpEnvParams struct { - StateContainerName string - StateStorageName string - VmssStateStorageName string - SubscriptionId string - ResourceGroupName string - KeyVaultUri string - VmssName string - RefreshVmssName string - DesiredSize int +var ( + instancesAddingStep = 3 + + functionAppName = os.Getenv("FUNCTION_APP_NAME") + vmssStateStorageName = os.Getenv("VMSS_STATE_STORAGE_NAME") + stateStorageName = os.Getenv("STATE_STORAGE_NAME") + stateContainerName = os.Getenv("STATE_CONTAINER_NAME") + prefix = os.Getenv("PREFIX") + clusterName = os.Getenv("CLUSTER_NAME") + subscriptionId = os.Getenv("SUBSCRIPTION_ID") + resourceGroupName = os.Getenv("RESOURCE_GROUP_NAME") +) + +func init() { + vmssInstancesAddingStep := os.Getenv("VMSS_INSTANCES_ADDING_STEP") + step, _ := strconv.Atoi(vmssInstancesAddingStep) + if step > 0 { + instancesAddingStep = step + } +} + +type ScaleUpParams struct { + VmssName string + RefreshVmssName string + DesiredSize int + Vmss *armcompute.VirtualMachineScaleSet + RefreshVmss *armcompute.VirtualMachineScaleSet } func Handler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := logging.LoggerFromCtx(ctx) - vmssStateStorageName := os.Getenv("VMSS_STATE_STORAGE_NAME") - stateStorageName := os.Getenv("STATE_STORAGE_NAME") - stateContainerName := os.Getenv("STATE_CONTAINER_NAME") - prefix := os.Getenv("PREFIX") - clusterName := os.Getenv("CLUSTER_NAME") - state, err := common.ReadState(ctx, stateStorageName, stateContainerName) if err != nil { logger.Error().Err(err).Msg("cannot read state") @@ -54,47 +66,48 @@ func Handler(w http.ResponseWriter, r *http.Request) { vmssName := common.GetVmScaleSetName(prefix, clusterName, version) refreshVmssName := common.GetRefreshVmssName(vmssName, version) - envParams := ScaleUpEnvParams{ - StateContainerName: stateContainerName, - StateStorageName: stateStorageName, - VmssStateStorageName: vmssStateStorageName, - SubscriptionId: os.Getenv("SUBSCRIPTION_ID"), - ResourceGroupName: os.Getenv("RESOURCE_GROUP_NAME"), - KeyVaultUri: os.Getenv("KEY_VAULT_URI"), - VmssName: vmssName, - RefreshVmssName: refreshVmssName, - DesiredSize: state.DesiredSize, + vmss, err := common.GetScaleSetOrNilOnNotFound(ctx, subscriptionId, resourceGroupName, vmssName) + if err != nil { + logger.Error().Err(err).Msgf("cannot get vmss %s", vmssName) + common.WriteErrorResponse(w, err) + return } - // initiale vmss creation if needed - if !vmssState.VmssCreated { - err := HandleVmssCreate(ctx, &envParams, &vmssState) - if err != nil { - logger.Error().Err(err).Msgf("cannot create vmss %s", envParams.VmssName) - common.WriteErrorResponse(w, err) - return - } - common.WriteSuccessResponse(w, fmt.Sprintf("created vmss %s successfully", envParams.VmssName)) - return + scaleUpParams := ScaleUpParams{ + VmssName: vmssName, + RefreshVmssName: refreshVmssName, + DesiredSize: state.DesiredSize, + Vmss: vmss, } // after vmss creation we need to wait until vmss is clusterized - if !state.Clusterized { + if vmss != nil && !state.Clusterized { common.WriteSuccessResponse(w, "Not clusterized yet, skipping...") return } // get expected vmss config - vmssConfig, err := common.ReadVmssConfig(ctx, envParams.VmssStateStorageName, envParams.StateContainerName) + vmssConfig, err := common.ReadVmssConfig(ctx, vmssStateStorageName, stateContainerName) if err != nil { logger.Error().Err(err).Msgf("cannot read vmss config") common.WriteErrorResponse(w, err) return } - // handle vmss refresh if needed + // 1. Initial VMSS creation flow: initiale vmss creation if needed + if vmssState.CurrentConfig == nil && vmss == nil { + err := createInitialVmss(ctx, vmssName, &vmssConfig, &vmssState, &state) + if err != nil { + common.WriteErrorResponse(w, err) + } else { + common.WriteSuccessResponse(w, fmt.Sprintf("created vmss %s successfully", vmssName)) + } + return + } + + // 2. Refresh flow: handle vmss refresh if needed if vmssState.RefreshStatus != common.RefreshNone { - err := HandleVmssRefresh(ctx, &envParams, &vmssConfig, &vmssState) + err := HandleVmssRefresh(ctx, &scaleUpParams, &vmssConfig, &vmssState) if err != nil { common.WriteErrorResponse(w, err) return @@ -103,11 +116,11 @@ func Handler(w http.ResponseWriter, r *http.Request) { return } - // compare current vmss config with expected vmss config and update if needed + // 3. Update flow: compare current vmss config with expected vmss config and update if needed if diff := common.VmssConfigsDiff(vmssState.CurrentConfig, &vmssConfig); diff != "" { logger.Info().Msgf("vmss config changed, diff: %s", diff) - err := HandleVmssUpdate(ctx, &envParams, &vmssConfig, &vmssState) + err := HandleVmssUpdate(ctx, &scaleUpParams, &vmssConfig, &vmssState) if err != nil { common.WriteErrorResponse(w, err) return @@ -117,7 +130,7 @@ func Handler(w http.ResponseWriter, r *http.Request) { } // scale up vmss if needed - err = common.ScaleUp(ctx, envParams.SubscriptionId, envParams.ResourceGroupName, envParams.VmssName, int64(state.DesiredSize)) + err = common.ScaleUp(ctx, subscriptionId, resourceGroupName, vmssName, int64(state.DesiredSize)) if err != nil { common.WriteErrorResponse(w, err) return @@ -125,46 +138,23 @@ func Handler(w http.ResponseWriter, r *http.Request) { common.WriteSuccessResponse(w, fmt.Sprintf("updated size to %d successfully", state.DesiredSize)) } -func HandleVmssCreate(ctx context.Context, params *ScaleUpEnvParams, vmssState *common.VMSSState) error { - logger := logging.LoggerFromCtx(ctx) - logger.Info().Msgf("creating vmss %s", params.VmssName) - - functionAppName := os.Getenv("FUNCTION_APP_NAME") - - vmssConfig, err := common.ReadVmssConfig(ctx, params.VmssStateStorageName, params.StateContainerName) - if err != nil { - return err - } - - vmssId, err := common.CreateOrUpdateVmss(ctx, params.SubscriptionId, params.ResourceGroupName, params.VmssName, vmssConfig, params.DesiredSize) +func HandleVmssRefresh(ctx context.Context, params *ScaleUpParams, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState) error { + refreshVmss, err := common.GetScaleSetOrNilOnNotFound(ctx, subscriptionId, resourceGroupName, params.RefreshVmssName) if err != nil { return err } + params.RefreshVmss = refreshVmss - err = common.AssignVmssContributorRoleToFunctionApp(ctx, params.SubscriptionId, params.ResourceGroupName, *vmssId, functionAppName) - if err != nil { - err = fmt.Errorf("cannot assign vmss 'contributor' role to function app: %w", err) - return err - } - - logger.Info().Msgf("created vmss %s, updating vmss state", params.VmssName) - vmssState.VmssCreated = true - vmssState.CurrentConfig = &vmssConfig - err = common.WriteVmssState(ctx, params.VmssStateStorageName, params.StateContainerName, *vmssState) - return err -} - -func HandleVmssRefresh(ctx context.Context, params *ScaleUpEnvParams, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState) error { if vmssState.RefreshStatus == common.RefreshNeeded { return initiateVmssRefresh(ctx, params, vmssConfig, vmssState) } else if vmssState.RefreshStatus == common.RefreshInProgress { return progressVmssRefresh(ctx, params, vmssConfig, vmssState) } else { - return fmt.Errorf("invalid refresh status: %d", vmssState.RefreshStatus) + return fmt.Errorf("invalid refresh status: %s", vmssState.RefreshStatus) } } -func HandleVmssUpdate(ctx context.Context, params *ScaleUpEnvParams, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState) error { +func HandleVmssUpdate(ctx context.Context, params *ScaleUpParams, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState) error { logger := logging.LoggerFromCtx(ctx) logger.Info().Msgf("updating vmss %s", params.VmssName) @@ -175,7 +165,7 @@ func HandleVmssUpdate(ctx context.Context, params *ScaleUpEnvParams, vmssConfig return fmt.Errorf(msg) } - _, err := common.CreateOrUpdateVmss(ctx, params.SubscriptionId, params.ResourceGroupName, params.VmssName, *vmssConfig, params.DesiredSize) + _, err := common.CreateOrUpdateVmss(ctx, subscriptionId, resourceGroupName, params.VmssName, *vmssConfig, params.DesiredSize) if err != nil { if updErr, ok := err.(*azcore.ResponseError); ok && updErr.ErrorCode == "PropertyChangeNotAllowed" { setNeedRefreshVmssState(ctx, params, vmssState) @@ -188,13 +178,33 @@ func HandleVmssUpdate(ctx context.Context, params *ScaleUpEnvParams, vmssConfig return nil } -func setNeedRefreshVmssState(ctx context.Context, params *ScaleUpEnvParams, vmssState *common.VMSSState) error { +func createInitialVmss(ctx context.Context, vmssName string, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState, state *protocol.ClusterState) error { + logger := logging.LoggerFromCtx(ctx) + + err := createVmss(ctx, vmssName, vmssConfig, state.DesiredSize) + if err != nil { + logger.Error().Err(err).Msgf("cannot create vmss %s", vmssName) + return err + } + + logger.Info().Msgf("updating vmss state setting current config") + vmssState.CurrentConfig = vmssConfig + + err = common.WriteVmssState(ctx, vmssStateStorageName, stateContainerName, *vmssState) + if err != nil { + logger.Error().Err(err).Msgf("cannot update vmss state") + return err + } + return nil +} + +func setNeedRefreshVmssState(ctx context.Context, params *ScaleUpParams, vmssState *common.VMSSState) error { logger := logging.LoggerFromCtx(ctx) logger.Info().Msgf("need to refresh vmss %s", params.VmssName) vmssState.RefreshStatus = common.RefreshNeeded - err := common.WriteVmssState(ctx, params.VmssStateStorageName, params.StateContainerName, *vmssState) + err := common.WriteVmssState(ctx, vmssStateStorageName, stateContainerName, *vmssState) if err != nil { err = fmt.Errorf("cannot update vmss state: %w", err) logger.Error().Err(err).Msgf("cannot update vmss %s", params.VmssName) @@ -202,52 +212,52 @@ func setNeedRefreshVmssState(ctx context.Context, params *ScaleUpEnvParams, vmss return err } -func initiateVmssRefresh(ctx context.Context, params *ScaleUpEnvParams, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState) error { +func initiateVmssRefresh(ctx context.Context, params *ScaleUpParams, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState) error { // Make sure that vmss current size is equal to "desired" number of weka instances logger := logging.LoggerFromCtx(ctx) - logger.Info().Msgf("starting vmss refresh for %s", params.VmssName) - - newVmssName := params.RefreshVmssName - newVmssSize := 0 - // if public ip address is assigned to vmss, domainNameLabel should differ (avoid VMScaleSetDnsRecordsInUse error) - for i := range vmssConfig.PrimaryNIC.IPConfigurations { - newDnsLabelName := fmt.Sprintf("%s-v%d", vmssConfig.PrimaryNIC.IPConfigurations[i].PublicIPAddress.DomainNameLabel, vmssState.VmssVersion+1) - vmssConfig.PrimaryNIC.IPConfigurations[i].PublicIPAddress.DomainNameLabel = newDnsLabelName - } - // update hostname prefix - vmssConfig.ComputerNamePrefix = fmt.Sprintf("%s-v%d", vmssConfig.ComputerNamePrefix, vmssState.VmssVersion+1) + if params.RefreshVmss == nil { + logger.Info().Msgf("starting vmss refresh for %s", params.VmssName) - logger.Info().Msgf("creating new vmss %s of size %d", newVmssName, newVmssSize) + newVmssName := params.RefreshVmssName + newVmssSize := 0 + // if public ip address is assigned to vmss, domainNameLabel should differ (avoid VMScaleSetDnsRecordsInUse error) + for i := range vmssConfig.PrimaryNIC.IPConfigurations { + newDnsLabelName := fmt.Sprintf("%s-v%d", vmssConfig.PrimaryNIC.IPConfigurations[i].PublicIPAddress.DomainNameLabel, vmssState.VmssVersion+1) + vmssConfig.PrimaryNIC.IPConfigurations[i].PublicIPAddress.DomainNameLabel = newDnsLabelName + } - _, err := common.CreateOrUpdateVmss(ctx, params.SubscriptionId, params.ResourceGroupName, newVmssName, *vmssConfig, newVmssSize) - if err != nil { - err = fmt.Errorf("cannot create new vmss: %w", err) - logger.Error().Err(err).Msgf("cannot create 'refresh' vmss %s", params.VmssName) - return err + // update hostname prefix + vmssConfig.ComputerNamePrefix = fmt.Sprintf("%s-v%d", vmssConfig.ComputerNamePrefix, vmssState.VmssVersion+1) + + logger.Info().Msgf("creating new vmss %s of size %d", newVmssName, newVmssSize) + + err := createVmss(ctx, newVmssName, vmssConfig, newVmssSize) + if err != nil { + logger.Error().Err(err).Msgf("cannot create 'refresh' vmss %s", params.VmssName) + return err + } } - logger.Info().Msgf("created new vmss %s, updating vmss state", params.VmssName) + logger.Info().Msgf("updating vmss state to 'refresh in progress'") vmssState.RefreshStatus = common.RefreshInProgress vmssState.CurrentConfig = vmssConfig - err = common.WriteVmssState(ctx, params.VmssStateStorageName, params.StateContainerName, *vmssState) + err := common.WriteVmssState(ctx, vmssStateStorageName, stateContainerName, *vmssState) return err } -func progressVmssRefresh(ctx context.Context, params *ScaleUpEnvParams, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState) error { +func progressVmssRefresh(ctx context.Context, params *ScaleUpParams, vmssConfig *common.VMSSConfig, vmssState *common.VMSSState) error { // Terminology: // "Outdated" vmss -- vmss that was used before refresh // "Refresh" vmss -- vmss that was created during refresh // "desired" number of weka instances -- number of weka instances expected by the user (stored in state) // note: "desired" number of weka instances should be the same as Outdated vmss size - // "temporarily expected" number of weka instances = desired number of weka instances + Refresh vmss size // // Algorithm: // 1. check current size of Refresh vmss // 2. check total number of weka instances in the weka cluster (Outdated vmss size + Refresh vmss size) - // 3. if number of weka instances is equal to "temporarily expected" number of weka instances, then: - // - scale down Outdated vmss using the following formula: - // desired number of weka instances - Refresh vmss size + // 3. when instances of refresh vmss joined weka cluster, then + // - the old instances in Outdated vmss will be removed automatically by scale_down workflow // 4. if Refresh vmss size is less than desired number of weka instances // and Outdated vmss size == (desired number of weka instances - Refresh vmss size), then: // - scale up Refresh vmss to size defined bu 'calculateRefreshVmssSize' function @@ -255,29 +265,30 @@ func progressVmssRefresh(ctx context.Context, params *ScaleUpEnvParams, vmssConf // - scale down Outdated vmss to 0 // 6. if Outdated vmss size is 0, then: // - delete Outdated vmss - // - rename Refresh vmss to Outdated vmss' name + // - set new vmss version // - update vmss state logger := logging.LoggerFromCtx(ctx) logger.Info().Msgf("progressing vmss refresh for %s", params.VmssName) - refreshVmssSize, err := common.GetScaleSetSize(ctx, params.SubscriptionId, params.ResourceGroupName, params.RefreshVmssName) - if err != nil { - err = fmt.Errorf("cannot get refresh vmss size: %w", err) + if params.RefreshVmss == nil || params.RefreshVmss.SKU.Capacity == nil { + err := fmt.Errorf("refresh vmss %s should exist and have capacity set", params.RefreshVmssName) logger.Error().Err(err).Send() return err } - outdatedVmssSize, err := common.GetScaleSetSize(ctx, params.SubscriptionId, params.ResourceGroupName, params.VmssName) - if err != nil { - err = fmt.Errorf("cannot get outdated vmss size: %w", err) + if params.Vmss == nil || params.Vmss.SKU.Capacity == nil { + err := fmt.Errorf("outdated vmss %s should exist and have capacity set", params.VmssName) logger.Error().Err(err).Send() return err } + refreshVmssSize := int(*params.RefreshVmss.SKU.Capacity) + outdatedVmssSize := int(*params.Vmss.SKU.Capacity) + if outdatedVmssSize == params.DesiredSize-refreshVmssSize && outdatedVmssSize != 0 { - newSize := calculateRefreshVmssSize(refreshVmssSize, params.DesiredSize) + newSize := calculateNewVmssSize(refreshVmssSize, params.DesiredSize) logger.Info().Msgf("scaling up refresh vmss %s from %d to %d", params.RefreshVmssName, refreshVmssSize, newSize) - err = common.ScaleUp(ctx, params.SubscriptionId, params.ResourceGroupName, params.RefreshVmssName, int64(newSize)) + err := common.ScaleUp(ctx, subscriptionId, resourceGroupName, params.RefreshVmssName, int64(newSize)) if err != nil { err = fmt.Errorf("cannot scale up refresh vmss: %w", err) logger.Error().Err(err).Send() @@ -287,16 +298,9 @@ func progressVmssRefresh(ctx context.Context, params *ScaleUpEnvParams, vmssConf return nil } - // wekaClusterStatus, err := status.GetClusterStatus(ctx, params.SubscriptionId, params.ResourceGroupName, params.VmssName, params.StateStorageName, params.StateContainerName, params.KeyVaultUri, params.RefreshVmssName) - // if err != nil { - // err = fmt.Errorf("cannot get weka cluster status: %w", err) - // logger.Error().Err(err).Send() - // return err - // } - if outdatedVmssSize == 0 { logger.Info().Msgf("deleting outdated vmss %s", params.VmssName) - err = common.DeleteVmss(ctx, params.SubscriptionId, params.ResourceGroupName, params.VmssName) + err := common.DeleteVmss(ctx, subscriptionId, resourceGroupName, params.VmssName) if err != nil { err = fmt.Errorf("cannot delete outdated vmss: %w", err) logger.Error().Err(err).Send() @@ -307,7 +311,7 @@ func progressVmssRefresh(ctx context.Context, params *ScaleUpEnvParams, vmssConf vmssState.RefreshStatus = common.RefreshNone vmssState.VmssVersion = vmssState.VmssVersion + 1 vmssState.CurrentConfig = vmssConfig - err = common.WriteVmssState(ctx, params.VmssStateStorageName, params.StateContainerName, *vmssState) + err = common.WriteVmssState(ctx, vmssStateStorageName, stateContainerName, *vmssState) if err != nil { err = fmt.Errorf("cannot update vmss state: %w", err) logger.Error().Err(err).Send() @@ -319,12 +323,31 @@ func progressVmssRefresh(ctx context.Context, params *ScaleUpEnvParams, vmssConf return nil } -func calculateRefreshVmssSize(current, expected int) int { +func calculateNewVmssSize(current, expected int) int { if expected <= current { return expected } - if expected-current < RefreshVmssInstancesAddingStep { + if expected-current < instancesAddingStep { return expected } - return current + RefreshVmssInstancesAddingStep + return current + instancesAddingStep +} + +func createVmss(ctx context.Context, vmssName string, vmssConfig *common.VMSSConfig, vmssSize int) error { + logger := logging.LoggerFromCtx(ctx) + logger.Info().Msgf("creating vmss %s", vmssName) + + vmssId, err := common.CreateOrUpdateVmss(ctx, subscriptionId, resourceGroupName, vmssName, *vmssConfig, vmssSize) + if err != nil { + return err + } + + err = common.AssignVmssContributorRoleToFunctionApp(ctx, subscriptionId, resourceGroupName, *vmssId, functionAppName) + if err != nil { + err = fmt.Errorf("cannot assign vmss 'contributor' role to function app: %w", err) + return err + } + + logger.Info().Msgf("created vmss %s", vmssName) + return nil } diff --git a/function-app/code/functions/status/status.go b/function-app/code/functions/status/status.go index 02e9bd0a..db2f9fc9 100644 --- a/function-app/code/functions/status/status.go +++ b/function-app/code/functions/status/status.go @@ -174,9 +174,9 @@ func Handler(w http.ResponseWriter, r *http.Request) { refreshVmssName = "" } result = common.VMSSStateVerbose{ - VmssCreated: vmssState.VmssCreated, + VmssCreated: vmssState.CurrentConfig != nil, VmssName: vmScaleSetName, - RefreshStatus: vmssState.RefreshStatus.String(), + RefreshStatus: string(vmssState.RefreshStatus), RefreshVmssName: refreshVmssName, CurrentConfig: vmssState.CurrentConfig, } diff --git a/functions.tf b/functions.tf index c80a5271..c1e2176b 100644 --- a/functions.tf +++ b/functions.tf @@ -163,6 +163,7 @@ resource "azurerm_linux_function_app" "function_app" { FUNCTION_APP_NAME = local.function_app_name PROXY_URL = var.proxy_url WEKA_HOME_URL = var.weka_home_url + VMSS_INSTANCES_ADDING_STEP = var.vmss_instances_adding_step https_only = true FUNCTIONS_EXTENSION_VERSION = "~4" diff --git a/variables.tf b/variables.tf index f5c39d54..40712deb 100644 --- a/variables.tf +++ b/variables.tf @@ -321,7 +321,7 @@ variable "function_app_storage_account_container_prefix" { variable "function_app_version" { type = string description = "Function app code version (hash)" - default = "60ec8c21cc5fd2b21d698fa9bb9e69ac" + default = "4d9c02b868f846b7c3fdd59141d4d401" } variable "function_app_dist" { @@ -603,3 +603,14 @@ variable "function_access_restriction_enabled" { default = false description = "Allow public access, Access restrictions apply to inbound access to internal vent" } + +variable "vmss_instances_adding_step" { + type = number + default = 3 + description = "Number of instances to add to vmss in one iteration" + + validation { + condition = var.vmss_instances_adding_step >= 1 + error_message = "Instances adding step should be greater than 0" + } +}