diff --git a/go.mod b/go.mod index b9ad84e..eeccdf2 100644 --- a/go.mod +++ b/go.mod @@ -69,4 +69,5 @@ require ( google.golang.org/grpc v1.56.3 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect ) diff --git a/go.sum b/go.sum index 4878009..a2a7adf 100644 --- a/go.sum +++ b/go.sum @@ -324,3 +324,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0= +k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 42b59cc..fd07e29 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -142,10 +142,9 @@ func resourcePulsarNamespace() *schema.Resource { Set: hashBacklogQuotaSubset(), }, "namespace_config": { - Type: schema.TypeSet, + Type: schema.TypeList, Optional: true, Description: descriptions["namespace_config"], - MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "anti_affinity": { @@ -156,30 +155,31 @@ func resourcePulsarNamespace() *schema.Resource { "max_consumers_per_subscription": { Type: schema.TypeInt, Optional: true, - Default: -1, + Default: 0, ValidateFunc: validateGtEq0, }, "max_consumers_per_topic": { Type: schema.TypeInt, Optional: true, - Default: -1, + Default: 0, ValidateFunc: validateGtEq0, }, "max_producers_per_topic": { Type: schema.TypeInt, Optional: true, - Default: -1, + Default: 0, ValidateFunc: validateGtEq0, }, "message_ttl_seconds": { Type: schema.TypeInt, Optional: true, - Default: -1, + Default: 0, ValidateFunc: validateGtEq0, }, "replication_clusters": { Type: schema.TypeList, Optional: true, + Computed: true, MinItems: 1, Elem: &schema.Schema{ Type: schema.TypeString, @@ -204,12 +204,11 @@ func resourcePulsarNamespace() *schema.Resource { "offload_threshold_size_in_mb": { Type: schema.TypeInt, Optional: true, - Default: -1, + Default: 0, ValidateFunc: validateGtEq0, }, }, }, - Set: namespaceConfigToHash, }, "persistence_policies": { Type: schema.TypeSet, @@ -332,76 +331,87 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me _ = d.Set("namespace", namespace) _ = d.Set("tenant", tenant) - if namespaceConfig, ok := d.GetOk("namespace_config"); ok && namespaceConfig.(*schema.Set).Len() > 0 { + if _, ok := d.GetOk("namespace_config"); ok { + var namespaceConfig = make(map[string]interface{}) afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String()) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err)) + } else { + namespaceConfig["anti_affinity"] = strings.Trim(strings.TrimSpace(afgrp), "\"") } maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err)) + } else { + namespaceConfig["max_consumers_per_subscription"] = maxConsPerSub } maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err)) + } else { + namespaceConfig["max_consumers_per_topic"] = maxConsPerTopic } maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) + } else { + namespaceConfig["max_producers_per_topic"] = maxProdPerTopic } messageTTL, err := client.GetNamespaceMessageTTL(ns.String()) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err)) + } else { + namespaceConfig["message_ttl_seconds"] = messageTTL } schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)) + } else { + namespaceConfig["schema_validation_enforce"] = schemaValidationEnforce } schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns) if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) + if !strings.Contains(err.Error(), "Invalid auth strategy") { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) + } + } else { + namespaceConfig["schema_compatibility_strategy"] = schemaCompatibilityStrategy.String() } replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String()) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) - } - - replClusters := make([]interface{}, len(replClustersRaw)) - for i, cl := range replClustersRaw { - replClusters[i] = cl + } else { + replClusters := make([]interface{}, len(replClustersRaw)) + for i, cl := range replClustersRaw { + replClusters[i] = cl + } + namespaceConfig["replication_clusters"] = replClusters } isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err)) + } else { + namespaceConfig["is_allow_auto_update_schema"] = isAllowAutoUpdateSchema } offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) + } else { + namespaceConfig["offload_threshold_size_in_mb"] = int(offloadTresholdSizeInMb) } - _ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{ - map[string]interface{}{ - "anti_affinity": strings.Trim(strings.TrimSpace(afgrp), "\""), - "max_consumers_per_subscription": maxConsPerSub, - "max_consumers_per_topic": maxConsPerTopic, - "max_producers_per_topic": maxProdPerTopic, - "message_ttl_seconds": messageTTL, - "replication_clusters": replClusters, - "schema_validation_enforce": schemaValidationEnforce, - "schema_compatibility_strategy": schemaCompatibilityStrategy.String(), - "is_allow_auto_update_schema": isAllowAutoUpdateSchema, - "offload_threshold_size_in_mb": int(offloadTresholdSizeInMb), - }, - })) + _ = d.Set("namespace_config", []interface{}{ + namespaceConfig, + }) } if persPoliciesCfg, ok := d.GetOk("persistence_policies"); ok && persPoliciesCfg.(*schema.Set).Len() > 0 { @@ -518,7 +528,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, namespace := d.Get("namespace").(string) tenant := d.Get("tenant").(string) enableDeduplication, deduplicationDefined := d.GetOk("enable_deduplication") - namespaceConfig := d.Get("namespace_config").(*schema.Set) + namespaceConfig := d.Get("namespace_config").([]interface{}) retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set) backlogQuotaConfig := d.Get("backlog_quota").(*schema.Set) dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set) @@ -534,8 +544,8 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, var errs error - if namespaceConfig.Len() > 0 { - nsCfg := unmarshalNamespaceConfig(namespaceConfig) + if len(namespaceConfig) > 0 { + nsCfg := unmarshalNamespaceConfigList(namespaceConfig) if len(nsCfg.AntiAffinity) > 0 { if err = client.SetNamespaceAntiAffinityGroup(nsName.String(), nsCfg.AntiAffinity); err != nil { @@ -746,23 +756,6 @@ func retentionPoliciesToHash(v interface{}) int { return hashcode.String(buf.String()) } -func namespaceConfigToHash(v interface{}) int { - var buf bytes.Buffer - m := v.(map[string]interface{}) - - buf.WriteString(fmt.Sprintf("%s-", m["anti_affinity"].(string))) - buf.WriteString(fmt.Sprintf("%d-", m["max_consumers_per_subscription"].(int))) - buf.WriteString(fmt.Sprintf("%d-", m["max_consumers_per_topic"].(int))) - buf.WriteString(fmt.Sprintf("%d-", m["max_producers_per_topic"].(int))) - buf.WriteString(fmt.Sprintf("%d-", m["message_ttl_seconds"].(int))) - buf.WriteString(fmt.Sprintf("%s-", m["replication_clusters"].([]interface{}))) - buf.WriteString(fmt.Sprintf("%t-", m["schema_validation_enforce"].(bool))) - buf.WriteString(fmt.Sprintf("%s-", m["schema_compatibility_strategy"].(string))) - buf.WriteString(fmt.Sprintf("%d-", m["offload_threshold_size_in_mb"].(int))) - - return hashcode.String(buf.String()) -} - func persistencePoliciesToHash(v interface{}) int { var buf bytes.Buffer m := v.(map[string]interface{}) @@ -819,10 +812,10 @@ func unmarshalRetentionPolicies(v *schema.Set) *utils.RetentionPolicies { return &rtnPolicies } -func unmarshalNamespaceConfig(v *schema.Set) *types.NamespaceConfig { +func unmarshalNamespaceConfigList(v []interface{}) *types.NamespaceConfig { var nsConfig types.NamespaceConfig - for _, ns := range v.List() { + for _, ns := range v { data := ns.(map[string]interface{}) rplClusters := data["replication_clusters"].([]interface{}) diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index 1f15fd0..b792833 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -184,6 +184,15 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) { resource.TestCheckNoResourceAttr(resourceName, "enable_deduplication"), resource.TestCheckNoResourceAttr(resourceName, "permission_grant.#"), ), + }, + { + Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName), + PlanOnly: true, + ExpectNonEmptyPlan: false, + }, + { + Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName), + PlanOnly: true, ExpectNonEmptyPlan: true, }, }, @@ -391,6 +400,41 @@ func TestNamespaceExternallyRemoved(t *testing.T) { }) } +func TestNamespaceWithUndefinedOptionalsDrift(t *testing.T) { + + resourceName := "pulsar_namespace.test" + cName := acctest.RandString(10) + tName := acctest.RandString(10) + nsName := acctest.RandString(10) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + IDRefreshName: resourceName, + CheckDestroy: testPulsarNamespaceDestroy, + Steps: []resource.TestStep{ + { + Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + ), + }, + { + Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + ), + ExpectNonEmptyPlan: false, + }, + { + Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName), + PlanOnly: true, + ExpectNonEmptyPlan: false, + }, + }, + }) +} + func createNamespace(t *testing.T, id string) { client, err := sharedClient(testWebServiceURL) if err != nil { diff --git a/pulsar/resource_pulsar_sink_test.go b/pulsar/resource_pulsar_sink_test.go index 5d1784c..55752e3 100644 --- a/pulsar/resource_pulsar_sink_test.go +++ b/pulsar/resource_pulsar_sink_test.go @@ -254,7 +254,8 @@ func TestSinkUpdate(t *testing.T) { t.Fatal(err) } configString := string(configBytes) - configString = strings.ReplaceAll(configString, "sink-1", "update-sink-test-1") + newName := "sink" + acctest.RandString(10) + configString = strings.ReplaceAll(configString, "sink-1", newName) resource.Test(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, @@ -265,7 +266,7 @@ func TestSinkUpdate(t *testing.T) { { Config: configString, Check: resource.ComposeTestCheckFunc(func(s *terraform.State) error { - name := "pulsar_sink.update-sink-test-1" + name := "pulsar_sink." + newName rs, ok := s.RootModule().Resources[name] if !ok { return fmt.Errorf("%s not be found", name)