From 2592c53b50be8e02224bc06b78a803b9783116bc Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 8 Jan 2025 14:26:16 +0800 Subject: [PATCH 1/9] fix namespace config drift --- pulsar/resource_pulsar_namespace.go | 19 ++++++++------- pulsar/resource_pulsar_namespace_test.go | 30 ++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 42b59cc..cdc9528 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -70,6 +70,7 @@ func resourcePulsarNamespace() *schema.Resource { "enable_deduplication": { Type: schema.TypeBool, Optional: true, + Computed: true, }, "dispatch_rate": { Type: schema.TypeSet, @@ -152,30 +153,31 @@ func resourcePulsarNamespace() *schema.Resource { Type: schema.TypeString, Optional: true, ValidateFunc: validateNotBlank, + Computed: true, }, "max_consumers_per_subscription": { Type: schema.TypeInt, Optional: true, - Default: -1, ValidateFunc: validateGtEq0, + Computed: true, }, "max_consumers_per_topic": { Type: schema.TypeInt, Optional: true, - Default: -1, ValidateFunc: validateGtEq0, + Computed: true, }, "max_producers_per_topic": { Type: schema.TypeInt, Optional: true, - Default: -1, ValidateFunc: validateGtEq0, + Computed: true, }, "message_ttl_seconds": { Type: schema.TypeInt, Optional: true, - Default: -1, ValidateFunc: validateGtEq0, + Computed: true, }, "replication_clusters": { Type: schema.TypeList, @@ -184,28 +186,29 @@ func resourcePulsarNamespace() *schema.Resource { Elem: &schema.Schema{ Type: schema.TypeString, }, + Computed: true, }, "schema_validation_enforce": { Type: schema.TypeBool, Optional: true, - Default: false, + Computed: true, }, "schema_compatibility_strategy": { Type: schema.TypeString, Optional: true, - Default: "Full", ValidateFunc: validateNotBlank, + Computed: true, }, "is_allow_auto_update_schema": { Type: schema.TypeBool, Optional: true, - Default: true, + Computed: true, }, "offload_threshold_size_in_mb": { Type: schema.TypeInt, Optional: true, - Default: -1, ValidateFunc: validateGtEq0, + Computed: true, }, }, }, diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index 1f15fd0..f967aed 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -391,6 +391,36 @@ func TestNamespaceExternallyRemoved(t *testing.T) { }) } +func TestNamespaceWithUndefinedOptionalsDrift(t *testing.T) { + + resourceName := "pulsar_namespace.test" + cName := acctest.RandString(10) + tName := acctest.RandString(10) + nsName := acctest.RandString(10) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + IDRefreshName: resourceName, + CheckDestroy: testPulsarNamespaceDestroy, + Steps: []resource.TestStep{ + { + Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + ), + }, + { + Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + ), + ExpectNonEmptyPlan: false, + }, + }, + }) +} + func createNamespace(t *testing.T, id string) { client, err := sharedClient(testWebServiceURL) if err != nil { From ec143af79efb14edf4f0225afeb590cd89df0a95 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 8 Jan 2025 14:53:35 +0800 Subject: [PATCH 2/9] fix ci --- pulsar/resource_pulsar_namespace.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index cdc9528..1cee5ca 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -197,7 +197,7 @@ func resourcePulsarNamespace() *schema.Resource { Type: schema.TypeString, Optional: true, ValidateFunc: validateNotBlank, - Computed: true, + Default: "Full", }, "is_allow_auto_update_schema": { Type: schema.TypeBool, From 6713f4f9b1270d287c221d150b96d5f4445692f1 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 8 Jan 2025 16:31:39 +0800 Subject: [PATCH 3/9] fix ci --- pulsar/resource_pulsar_namespace.go | 125 +++++++++++++++------------- 1 file changed, 68 insertions(+), 57 deletions(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 1cee5ca..210a590 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -153,31 +153,26 @@ func resourcePulsarNamespace() *schema.Resource { Type: schema.TypeString, Optional: true, ValidateFunc: validateNotBlank, - Computed: true, }, "max_consumers_per_subscription": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, - Computed: true, }, "max_consumers_per_topic": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, - Computed: true, }, "max_producers_per_topic": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, - Computed: true, }, "message_ttl_seconds": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, - Computed: true, }, "replication_clusters": { Type: schema.TypeList, @@ -186,12 +181,10 @@ func resourcePulsarNamespace() *schema.Resource { Elem: &schema.Schema{ Type: schema.TypeString, }, - Computed: true, }, "schema_validation_enforce": { Type: schema.TypeBool, Optional: true, - Computed: true, }, "schema_compatibility_strategy": { Type: schema.TypeString, @@ -202,13 +195,11 @@ func resourcePulsarNamespace() *schema.Resource { "is_allow_auto_update_schema": { Type: schema.TypeBool, Optional: true, - Computed: true, }, "offload_threshold_size_in_mb": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, - Computed: true, }, }, }, @@ -336,75 +327,95 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me _ = d.Set("tenant", tenant) if namespaceConfig, ok := d.GetOk("namespace_config"); ok && namespaceConfig.(*schema.Set).Len() > 0 { - afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String()) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err)) - } + configData := namespaceConfig.(*schema.Set).List()[0].(map[string]interface{}) + data := make(map[string]interface{}) - maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err)) + if _, ok := configData["anti_affinity"]; ok { + afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String()) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err)) + } + data["anti_affinity"] = strings.Trim(strings.TrimSpace(afgrp), "\"") } - maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err)) + if _, ok := configData["max_consumers_per_subscription"]; ok { + maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err)) + } + data["max_consumers_per_subscription"] = maxConsPerSub } - maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) + if _, ok := configData["max_consumers_per_topic"]; ok { + maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err)) + } + data["max_consumers_per_topic"] = maxConsPerTopic } - messageTTL, err := client.GetNamespaceMessageTTL(ns.String()) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err)) + if _, ok := configData["max_producers_per_topic"]; ok { + maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) + } + data["max_producers_per_topic"] = maxProdPerTopic } - schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)) + if _, ok := configData["message_ttl_seconds"]; ok { + messageTTL, err := client.GetNamespaceMessageTTL(ns.String()) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err)) + } + data["message_ttl_seconds"] = messageTTL } - schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) + if _, ok := configData["schema_validation_enforce"]; ok { + schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)) + } + data["schema_validation_enforce"] = schemaValidationEnforce } - replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String()) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) + if _, ok := configData["schema_compatibility_strategy"]; ok { + schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) + } + data["schema_compatibility_strategy"] = schemaCompatibilityStrategy.String() } - replClusters := make([]interface{}, len(replClustersRaw)) - for i, cl := range replClustersRaw { - replClusters[i] = cl + if _, ok := configData["replication_clusters"]; ok { + replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String()) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) + } + + replClusters := make([]interface{}, len(replClustersRaw)) + for i, cl := range replClustersRaw { + replClusters[i] = cl + } + data["replication_clusters"] = replClusters } - isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err)) + if _, ok := configData["is_allow_auto_update_schema"]; ok { + isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err)) + } + data["is_allow_auto_update_schema"] = isAllowAutoUpdateSchema } - offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) + if _, ok := configData["offload_threshold_size_in_mb"]; ok { + offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) + } + data["offload_threshold_size_in_mb"] = int(offloadTresholdSizeInMb) } - _ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{ - map[string]interface{}{ - "anti_affinity": strings.Trim(strings.TrimSpace(afgrp), "\""), - "max_consumers_per_subscription": maxConsPerSub, - "max_consumers_per_topic": maxConsPerTopic, - "max_producers_per_topic": maxProdPerTopic, - "message_ttl_seconds": messageTTL, - "replication_clusters": replClusters, - "schema_validation_enforce": schemaValidationEnforce, - "schema_compatibility_strategy": schemaCompatibilityStrategy.String(), - "is_allow_auto_update_schema": isAllowAutoUpdateSchema, - "offload_threshold_size_in_mb": int(offloadTresholdSizeInMb), - }, - })) + _ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{data})) } if persPoliciesCfg, ok := d.GetOk("persistence_policies"); ok && persPoliciesCfg.(*schema.Set).Len() > 0 { From 7193e522464dfedf454136430cd91a9fb00d9d50 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 8 Jan 2025 17:31:52 +0800 Subject: [PATCH 4/9] fix ci --- pulsar/resource_pulsar_namespace.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 210a590..7c0b7b0 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -181,6 +181,11 @@ func resourcePulsarNamespace() *schema.Resource { Elem: &schema.Schema{ Type: schema.TypeString, }, + DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { + // If the field is not set in config, suppress the diff + _, exists := d.GetOk("namespace_config.0.replication_clusters") + return !exists + }, }, "schema_validation_enforce": { Type: schema.TypeBool, @@ -191,6 +196,11 @@ func resourcePulsarNamespace() *schema.Resource { Optional: true, ValidateFunc: validateNotBlank, Default: "Full", + DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { + // If the field is not set in config, suppress the diff + _, exists := d.GetOk("namespace_config.0.schema_compatibility_strategy") + return !exists + }, }, "is_allow_auto_update_schema": { Type: schema.TypeBool, @@ -392,11 +402,14 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) } - replClusters := make([]interface{}, len(replClustersRaw)) - for i, cl := range replClustersRaw { - replClusters[i] = cl + // Only set replication_clusters if it was explicitly configured + if len(replClustersRaw) > 0 { + replClusters := make([]interface{}, len(replClustersRaw)) + for i, cl := range replClustersRaw { + replClusters[i] = cl + } + data["replication_clusters"] = replClusters } - data["replication_clusters"] = replClusters } if _, ok := configData["is_allow_auto_update_schema"]; ok { From 91f7ddd11d3cb6e6183db2c6d40aaf3459d67cfd Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 8 Jan 2025 23:57:13 +0800 Subject: [PATCH 5/9] stash --- go.mod | 1 + go.sum | 2 + pulsar/resource_pulsar_namespace.go | 274 ++++++++++++++-------------- types/types.go | 18 +- 4 files changed, 154 insertions(+), 141 deletions(-) diff --git a/go.mod b/go.mod index b9ad84e..eeccdf2 100644 --- a/go.mod +++ b/go.mod @@ -69,4 +69,5 @@ require ( google.golang.org/grpc v1.56.3 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect ) diff --git a/go.sum b/go.sum index 4878009..a2a7adf 100644 --- a/go.sum +++ b/go.sum @@ -324,3 +324,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0= +k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 7c0b7b0..b9a0582 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -31,6 +31,7 @@ import ( "github.com/streamnative/terraform-provider-pulsar/hashcode" "github.com/streamnative/terraform-provider-pulsar/types" + "k8s.io/utils/ptr" ) func resourcePulsarNamespace() *schema.Resource { @@ -153,26 +154,36 @@ func resourcePulsarNamespace() *schema.Resource { Type: schema.TypeString, Optional: true, ValidateFunc: validateNotBlank, + Computed: true, + Default: nil, }, "max_consumers_per_subscription": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, + Computed: true, + Default: nil, }, "max_consumers_per_topic": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, + Computed: true, + Default: nil, }, "max_producers_per_topic": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, + Computed: true, + Default: nil, }, "message_ttl_seconds": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, + Computed: true, + Default: nil, }, "replication_clusters": { Type: schema.TypeList, @@ -181,35 +192,33 @@ func resourcePulsarNamespace() *schema.Resource { Elem: &schema.Schema{ Type: schema.TypeString, }, - DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { - // If the field is not set in config, suppress the diff - _, exists := d.GetOk("namespace_config.0.replication_clusters") - return !exists - }, + Computed: true, + Default: nil, }, "schema_validation_enforce": { Type: schema.TypeBool, Optional: true, + Computed: true, + Default: nil, }, "schema_compatibility_strategy": { Type: schema.TypeString, Optional: true, ValidateFunc: validateNotBlank, Default: "Full", - DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { - // If the field is not set in config, suppress the diff - _, exists := d.GetOk("namespace_config.0.schema_compatibility_strategy") - return !exists - }, }, "is_allow_auto_update_schema": { Type: schema.TypeBool, Optional: true, + Computed: true, + Default: nil, }, "offload_threshold_size_in_mb": { Type: schema.TypeInt, Optional: true, ValidateFunc: validateGtEq0, + Computed: true, + Default: nil, }, }, }, @@ -337,98 +346,75 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me _ = d.Set("tenant", tenant) if namespaceConfig, ok := d.GetOk("namespace_config"); ok && namespaceConfig.(*schema.Set).Len() > 0 { - configData := namespaceConfig.(*schema.Set).List()[0].(map[string]interface{}) - data := make(map[string]interface{}) - - if _, ok := configData["anti_affinity"]; ok { - afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String()) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err)) - } - data["anti_affinity"] = strings.Trim(strings.TrimSpace(afgrp), "\"") + afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String()) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err)) } - if _, ok := configData["max_consumers_per_subscription"]; ok { - maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err)) - } - data["max_consumers_per_subscription"] = maxConsPerSub + maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err)) } - if _, ok := configData["max_consumers_per_topic"]; ok { - maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err)) - } - data["max_consumers_per_topic"] = maxConsPerTopic + maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err)) } - if _, ok := configData["max_producers_per_topic"]; ok { - maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) - } - data["max_producers_per_topic"] = maxProdPerTopic + maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) } - if _, ok := configData["message_ttl_seconds"]; ok { - messageTTL, err := client.GetNamespaceMessageTTL(ns.String()) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err)) - } - data["message_ttl_seconds"] = messageTTL + messageTTL, err := client.GetNamespaceMessageTTL(ns.String()) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err)) } - if _, ok := configData["schema_validation_enforce"]; ok { - schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)) - } - data["schema_validation_enforce"] = schemaValidationEnforce + schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)) } - if _, ok := configData["schema_compatibility_strategy"]; ok { - schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) - } - data["schema_compatibility_strategy"] = schemaCompatibilityStrategy.String() + schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) } - if _, ok := configData["replication_clusters"]; ok { - replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String()) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) - } + replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String()) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) + } - // Only set replication_clusters if it was explicitly configured - if len(replClustersRaw) > 0 { - replClusters := make([]interface{}, len(replClustersRaw)) - for i, cl := range replClustersRaw { - replClusters[i] = cl - } - data["replication_clusters"] = replClusters - } + replClusters := make([]interface{}, len(replClustersRaw)) + for i, cl := range replClustersRaw { + replClusters[i] = cl } - if _, ok := configData["is_allow_auto_update_schema"]; ok { - isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err)) - } - data["is_allow_auto_update_schema"] = isAllowAutoUpdateSchema + isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err)) } - if _, ok := configData["offload_threshold_size_in_mb"]; ok { - offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) - } - data["offload_threshold_size_in_mb"] = int(offloadTresholdSizeInMb) + offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) } - _ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{data})) + _ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{ + map[string]interface{}{ + "anti_affinity": strings.Trim(strings.TrimSpace(afgrp), "\""), + "max_consumers_per_subscription": maxConsPerSub, + "max_consumers_per_topic": maxConsPerTopic, + "max_producers_per_topic": maxProdPerTopic, + "message_ttl_seconds": messageTTL, + "replication_clusters": replClusters, + "schema_validation_enforce": schemaValidationEnforce, + "schema_compatibility_strategy": schemaCompatibilityStrategy.String(), + "is_allow_auto_update_schema": isAllowAutoUpdateSchema, + "offload_threshold_size_in_mb": int(offloadTresholdSizeInMb), + }, + })) } if persPoliciesCfg, ok := d.GetOk("persistence_policies"); ok && persPoliciesCfg.(*schema.Set).Len() > 0 { @@ -561,65 +547,71 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, var errs error - if namespaceConfig.Len() > 0 { - nsCfg := unmarshalNamespaceConfig(namespaceConfig) + if d.HasChange("namespace_config") { + if namespaceConfig.Len() > 0 { + nsCfg := unmarshalNamespaceConfig(namespaceConfig) - if len(nsCfg.AntiAffinity) > 0 { - if err = client.SetNamespaceAntiAffinityGroup(nsName.String(), nsCfg.AntiAffinity); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetNamespaceAntiAffinityGroup: %w", err)) + if nsCfg.AntiAffinity != nil && len(*nsCfg.AntiAffinity) > 0 { + if err = client.SetNamespaceAntiAffinityGroup(nsName.String(), *nsCfg.AntiAffinity); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetNamespaceAntiAffinityGroup: %w", err)) + } } - } - if len(nsCfg.ReplicationClusters) > 0 { - if err = client.SetNamespaceReplicationClusters(nsName.String(), nsCfg.ReplicationClusters); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetNamespaceReplicationClusters: %w", err)) + if len(nsCfg.ReplicationClusters) > 0 { + if err = client.SetNamespaceReplicationClusters(nsName.String(), nsCfg.ReplicationClusters); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetNamespaceReplicationClusters: %w", err)) + } } - } - if nsCfg.MaxConsumersPerTopic >= 0 { - if err = client.SetMaxConsumersPerTopic(*nsName, nsCfg.MaxConsumersPerTopic); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetMaxConsumersPerTopic: %w", err)) + if nsCfg.MaxConsumersPerTopic != nil && *nsCfg.MaxConsumersPerTopic >= 0 { + if err = client.SetMaxConsumersPerTopic(*nsName, *nsCfg.MaxConsumersPerTopic); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetMaxConsumersPerTopic: %w", err)) + } } - } - if nsCfg.MaxConsumersPerSubscription >= 0 { - if err = client.SetMaxConsumersPerSubscription(*nsName, nsCfg.MaxConsumersPerSubscription); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetMaxConsumersPerSubscription: %w", err)) + if nsCfg.MaxConsumersPerSubscription != nil && *nsCfg.MaxConsumersPerSubscription >= 0 { + if err = client.SetMaxConsumersPerSubscription(*nsName, *nsCfg.MaxConsumersPerSubscription); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetMaxConsumersPerSubscription: %w", err)) + } } - } - if nsCfg.MaxProducersPerTopic >= 0 { - if err = client.SetMaxProducersPerTopic(*nsName, nsCfg.MaxProducersPerTopic); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetMaxProducersPerTopic: %w", err)) + if nsCfg.MaxProducersPerTopic != nil && *nsCfg.MaxProducersPerTopic >= 0 { + if err = client.SetMaxProducersPerTopic(*nsName, *nsCfg.MaxProducersPerTopic); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetMaxProducersPerTopic: %w", err)) + } } - } - if nsCfg.MessageTTLInSeconds >= 0 { - if err = client.SetNamespaceMessageTTL(nsName.String(), nsCfg.MessageTTLInSeconds); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetNamespaceMessageTTL: %w", err)) + if nsCfg.MessageTTLInSeconds != nil && *nsCfg.MessageTTLInSeconds >= 0 { + if err = client.SetNamespaceMessageTTL(nsName.String(), *nsCfg.MessageTTLInSeconds); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetNamespaceMessageTTL: %w", err)) + } } - } - if nsCfg.OffloadThresholdSizeInMb >= 0 { - if err = client.SetOffloadThreshold(*nsName, int64(nsCfg.OffloadThresholdSizeInMb)); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetOffloadThreshold: %w", err)) + if nsCfg.OffloadThresholdSizeInMb != nil && *nsCfg.OffloadThresholdSizeInMb >= 0 { + if err = client.SetOffloadThreshold(*nsName, int64(*nsCfg.OffloadThresholdSizeInMb)); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetOffloadThreshold: %w", err)) + } } - } - if err = client.SetSchemaValidationEnforced(*nsName, nsCfg.SchemaValidationEnforce); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetSchemaValidationEnforced: %w", err)) - } + if nsCfg.SchemaValidationEnforce != nil { + if err = client.SetSchemaValidationEnforced(*nsName, *nsCfg.SchemaValidationEnforce); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSchemaValidationEnforced: %w", err)) + } + } - if len(nsCfg.SchemaCompatibilityStrategy) > 0 { - strategy, err := utils.ParseSchemaAutoUpdateCompatibilityStrategy(nsCfg.SchemaCompatibilityStrategy) - if err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) - } else if err = client.SetSchemaAutoUpdateCompatibilityStrategy(*nsName, strategy); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) + if nsCfg.SchemaCompatibilityStrategy != nil && len(*nsCfg.SchemaCompatibilityStrategy) > 0 { + strategy, err := utils.ParseSchemaAutoUpdateCompatibilityStrategy(*nsCfg.SchemaCompatibilityStrategy) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) + } else if err = client.SetSchemaAutoUpdateCompatibilityStrategy(*nsName, strategy); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) + } + } + if nsCfg.IsAllowAutoUpdateSchema != nil { + if err = client.SetIsAllowAutoUpdateSchema(*nsName, *nsCfg.IsAllowAutoUpdateSchema); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetIsAllowAutoUpdateSchema: %w", err)) + } } - } - if err = client.SetIsAllowAutoUpdateSchema(*nsName, nsCfg.IsAllowAutoUpdateSchema); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetIsAllowAutoUpdateSchema: %w", err)) } } @@ -854,15 +846,33 @@ func unmarshalNamespaceConfig(v *schema.Set) *types.NamespaceConfig { rplClusters := data["replication_clusters"].([]interface{}) nsConfig.ReplicationClusters = handleHCLArrayV2(rplClusters) - nsConfig.MaxProducersPerTopic = data["max_producers_per_topic"].(int) - nsConfig.MaxConsumersPerTopic = data["max_consumers_per_topic"].(int) - nsConfig.MaxConsumersPerSubscription = data["max_consumers_per_subscription"].(int) - nsConfig.MessageTTLInSeconds = data["message_ttl_seconds"].(int) - nsConfig.AntiAffinity = data["anti_affinity"].(string) - nsConfig.SchemaValidationEnforce = data["schema_validation_enforce"].(bool) - nsConfig.SchemaCompatibilityStrategy = data["schema_compatibility_strategy"].(string) - nsConfig.IsAllowAutoUpdateSchema = data["is_allow_auto_update_schema"].(bool) - nsConfig.OffloadThresholdSizeInMb = data["offload_threshold_size_in_mb"].(int) + if v, has := data["max_producers_per_topic"]; has { + nsConfig.MaxProducersPerTopic = ptr.To(v.(int)) + } + if v, has := data["max_consumers_per_topic"]; has { + nsConfig.MaxConsumersPerTopic = ptr.To(v.(int)) + } + if v, has := data["max_consumers_per_subscription"]; has { + nsConfig.MaxConsumersPerSubscription = ptr.To(v.(int)) + } + if v, has := data["message_ttl_seconds"]; has { + nsConfig.MessageTTLInSeconds = ptr.To(v.(int)) + } + if v, has := data["anti_affinity"]; has { + nsConfig.AntiAffinity = ptr.To(v.(string)) + } + if v, has := data["schema_validation_enforce"]; has { + nsConfig.SchemaValidationEnforce = ptr.To(v.(bool)) + } + if v, has := data["schema_compatibility_strategy"]; has { + nsConfig.SchemaCompatibilityStrategy = ptr.To(v.(string)) + } + if v, has := data["is_allow_auto_update_schema"]; has { + nsConfig.IsAllowAutoUpdateSchema = ptr.To(v.(bool)) + } + if v, has := data["offload_threshold_size_in_mb"]; has { + nsConfig.OffloadThresholdSizeInMb = ptr.To(v.(int)) + } } return &nsConfig diff --git a/types/types.go b/types/types.go index 18a3296..fe66509 100644 --- a/types/types.go +++ b/types/types.go @@ -24,16 +24,16 @@ import ( type ( // configurable features of the Pulsar Namespace Entity via Terraform NamespaceConfig struct { - AntiAffinity string + AntiAffinity *string ReplicationClusters []string - MaxConsumersPerTopic int - MaxProducersPerTopic int - MaxConsumersPerSubscription int - MessageTTLInSeconds int - SchemaValidationEnforce bool - SchemaCompatibilityStrategy string - IsAllowAutoUpdateSchema bool - OffloadThresholdSizeInMb int + MaxConsumersPerTopic *int + MaxProducersPerTopic *int + MaxConsumersPerSubscription *int + MessageTTLInSeconds *int + SchemaValidationEnforce *bool + SchemaCompatibilityStrategy *string + IsAllowAutoUpdateSchema *bool + OffloadThresholdSizeInMb *int } SplitNS struct { From ad850ff0edb5302cc7b4923d9b79e3b4db0eba5c Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 9 Jan 2025 00:16:04 +0800 Subject: [PATCH 6/9] revert changes --- pulsar/resource_pulsar_namespace.go | 191 +++++++++++++--------------- types/types.go | 18 +-- 2 files changed, 97 insertions(+), 112 deletions(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index b9a0582..5bbef83 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -31,7 +31,6 @@ import ( "github.com/streamnative/terraform-provider-pulsar/hashcode" "github.com/streamnative/terraform-provider-pulsar/types" - "k8s.io/utils/ptr" ) func resourcePulsarNamespace() *schema.Resource { @@ -71,7 +70,6 @@ func resourcePulsarNamespace() *schema.Resource { "enable_deduplication": { Type: schema.TypeBool, Optional: true, - Computed: true, }, "dispatch_rate": { Type: schema.TypeSet, @@ -144,85 +142,74 @@ func resourcePulsarNamespace() *schema.Resource { Set: hashBacklogQuotaSubset(), }, "namespace_config": { - Type: schema.TypeSet, + Type: schema.TypeList, Optional: true, Description: descriptions["namespace_config"], - MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "anti_affinity": { Type: schema.TypeString, Optional: true, ValidateFunc: validateNotBlank, - Computed: true, - Default: nil, }, "max_consumers_per_subscription": { Type: schema.TypeInt, Optional: true, + Default: 0, ValidateFunc: validateGtEq0, - Computed: true, - Default: nil, }, "max_consumers_per_topic": { Type: schema.TypeInt, Optional: true, + Default: 0, ValidateFunc: validateGtEq0, - Computed: true, - Default: nil, }, "max_producers_per_topic": { Type: schema.TypeInt, Optional: true, + Default: 0, ValidateFunc: validateGtEq0, - Computed: true, - Default: nil, }, "message_ttl_seconds": { Type: schema.TypeInt, Optional: true, + Default: 0, ValidateFunc: validateGtEq0, - Computed: true, - Default: nil, }, "replication_clusters": { Type: schema.TypeList, Optional: true, + Computed: true, MinItems: 1, Elem: &schema.Schema{ Type: schema.TypeString, }, - Computed: true, - Default: nil, }, "schema_validation_enforce": { Type: schema.TypeBool, Optional: true, - Computed: true, - Default: nil, + Default: false, }, "schema_compatibility_strategy": { Type: schema.TypeString, Optional: true, - ValidateFunc: validateNotBlank, Default: "Full", + ValidateFunc: validateNotBlank, }, "is_allow_auto_update_schema": { Type: schema.TypeBool, Optional: true, - Computed: true, - Default: nil, + Default: true, }, "offload_threshold_size_in_mb": { Type: schema.TypeInt, Optional: true, + Default: 0, ValidateFunc: validateGtEq0, - Computed: true, - Default: nil, }, }, }, - Set: namespaceConfigToHash, + //Set: namespaceConfigToHash, }, "persistence_policies": { Type: schema.TypeSet, @@ -345,7 +332,7 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me _ = d.Set("namespace", namespace) _ = d.Set("tenant", tenant) - if namespaceConfig, ok := d.GetOk("namespace_config"); ok && namespaceConfig.(*schema.Set).Len() > 0 { + if namespaceConfig, ok := d.GetOk("namespace_config"); ok && len(namespaceConfig.([]interface{})) > 0 { afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String()) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err)) @@ -401,7 +388,7 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) } - _ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{ + _ = d.Set("namespace_config", []interface{}{ map[string]interface{}{ "anti_affinity": strings.Trim(strings.TrimSpace(afgrp), "\""), "max_consumers_per_subscription": maxConsPerSub, @@ -414,7 +401,7 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me "is_allow_auto_update_schema": isAllowAutoUpdateSchema, "offload_threshold_size_in_mb": int(offloadTresholdSizeInMb), }, - })) + }) } if persPoliciesCfg, ok := d.GetOk("persistence_policies"); ok && persPoliciesCfg.(*schema.Set).Len() > 0 { @@ -531,7 +518,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, namespace := d.Get("namespace").(string) tenant := d.Get("tenant").(string) enableDeduplication, deduplicationDefined := d.GetOk("enable_deduplication") - namespaceConfig := d.Get("namespace_config").(*schema.Set) + namespaceConfig := d.Get("namespace_config").([]interface{}) retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set) backlogQuotaConfig := d.Get("backlog_quota").(*schema.Set) dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set) @@ -547,72 +534,66 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, var errs error - if d.HasChange("namespace_config") { - if namespaceConfig.Len() > 0 { - nsCfg := unmarshalNamespaceConfig(namespaceConfig) + if len(namespaceConfig) > 0 { + nsCfg := unmarshalNamespaceConfigList(namespaceConfig) - if nsCfg.AntiAffinity != nil && len(*nsCfg.AntiAffinity) > 0 { - if err = client.SetNamespaceAntiAffinityGroup(nsName.String(), *nsCfg.AntiAffinity); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetNamespaceAntiAffinityGroup: %w", err)) - } + if len(nsCfg.AntiAffinity) > 0 { + if err = client.SetNamespaceAntiAffinityGroup(nsName.String(), nsCfg.AntiAffinity); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetNamespaceAntiAffinityGroup: %w", err)) } + } - if len(nsCfg.ReplicationClusters) > 0 { - if err = client.SetNamespaceReplicationClusters(nsName.String(), nsCfg.ReplicationClusters); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetNamespaceReplicationClusters: %w", err)) - } + if len(nsCfg.ReplicationClusters) > 0 { + if err = client.SetNamespaceReplicationClusters(nsName.String(), nsCfg.ReplicationClusters); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetNamespaceReplicationClusters: %w", err)) } + } - if nsCfg.MaxConsumersPerTopic != nil && *nsCfg.MaxConsumersPerTopic >= 0 { - if err = client.SetMaxConsumersPerTopic(*nsName, *nsCfg.MaxConsumersPerTopic); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetMaxConsumersPerTopic: %w", err)) - } + if nsCfg.MaxConsumersPerTopic >= 0 { + if err = client.SetMaxConsumersPerTopic(*nsName, nsCfg.MaxConsumersPerTopic); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetMaxConsumersPerTopic: %w", err)) } + } - if nsCfg.MaxConsumersPerSubscription != nil && *nsCfg.MaxConsumersPerSubscription >= 0 { - if err = client.SetMaxConsumersPerSubscription(*nsName, *nsCfg.MaxConsumersPerSubscription); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetMaxConsumersPerSubscription: %w", err)) - } + if nsCfg.MaxConsumersPerSubscription >= 0 { + if err = client.SetMaxConsumersPerSubscription(*nsName, nsCfg.MaxConsumersPerSubscription); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetMaxConsumersPerSubscription: %w", err)) } + } - if nsCfg.MaxProducersPerTopic != nil && *nsCfg.MaxProducersPerTopic >= 0 { - if err = client.SetMaxProducersPerTopic(*nsName, *nsCfg.MaxProducersPerTopic); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetMaxProducersPerTopic: %w", err)) - } + if nsCfg.MaxProducersPerTopic >= 0 { + if err = client.SetMaxProducersPerTopic(*nsName, nsCfg.MaxProducersPerTopic); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetMaxProducersPerTopic: %w", err)) } + } - if nsCfg.MessageTTLInSeconds != nil && *nsCfg.MessageTTLInSeconds >= 0 { - if err = client.SetNamespaceMessageTTL(nsName.String(), *nsCfg.MessageTTLInSeconds); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetNamespaceMessageTTL: %w", err)) - } + if nsCfg.MessageTTLInSeconds >= 0 { + if err = client.SetNamespaceMessageTTL(nsName.String(), nsCfg.MessageTTLInSeconds); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetNamespaceMessageTTL: %w", err)) } + } - if nsCfg.OffloadThresholdSizeInMb != nil && *nsCfg.OffloadThresholdSizeInMb >= 0 { - if err = client.SetOffloadThreshold(*nsName, int64(*nsCfg.OffloadThresholdSizeInMb)); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetOffloadThreshold: %w", err)) - } + if nsCfg.OffloadThresholdSizeInMb >= 0 { + if err = client.SetOffloadThreshold(*nsName, int64(nsCfg.OffloadThresholdSizeInMb)); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetOffloadThreshold: %w", err)) } + } - if nsCfg.SchemaValidationEnforce != nil { - if err = client.SetSchemaValidationEnforced(*nsName, *nsCfg.SchemaValidationEnforce); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetSchemaValidationEnforced: %w", err)) - } - } + if err = client.SetSchemaValidationEnforced(*nsName, nsCfg.SchemaValidationEnforce); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSchemaValidationEnforced: %w", err)) + } - if nsCfg.SchemaCompatibilityStrategy != nil && len(*nsCfg.SchemaCompatibilityStrategy) > 0 { - strategy, err := utils.ParseSchemaAutoUpdateCompatibilityStrategy(*nsCfg.SchemaCompatibilityStrategy) - if err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) - } else if err = client.SetSchemaAutoUpdateCompatibilityStrategy(*nsName, strategy); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) - } - } - if nsCfg.IsAllowAutoUpdateSchema != nil { - if err = client.SetIsAllowAutoUpdateSchema(*nsName, *nsCfg.IsAllowAutoUpdateSchema); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetIsAllowAutoUpdateSchema: %w", err)) - } + if len(nsCfg.SchemaCompatibilityStrategy) > 0 { + strategy, err := utils.ParseSchemaAutoUpdateCompatibilityStrategy(nsCfg.SchemaCompatibilityStrategy) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) + } else if err = client.SetSchemaAutoUpdateCompatibilityStrategy(*nsName, strategy); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) } } + if err = client.SetIsAllowAutoUpdateSchema(*nsName, nsCfg.IsAllowAutoUpdateSchema); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetIsAllowAutoUpdateSchema: %w", err)) + } } if retentionPoliciesConfig.Len() > 0 { @@ -846,33 +827,37 @@ func unmarshalNamespaceConfig(v *schema.Set) *types.NamespaceConfig { rplClusters := data["replication_clusters"].([]interface{}) nsConfig.ReplicationClusters = handleHCLArrayV2(rplClusters) - if v, has := data["max_producers_per_topic"]; has { - nsConfig.MaxProducersPerTopic = ptr.To(v.(int)) - } - if v, has := data["max_consumers_per_topic"]; has { - nsConfig.MaxConsumersPerTopic = ptr.To(v.(int)) - } - if v, has := data["max_consumers_per_subscription"]; has { - nsConfig.MaxConsumersPerSubscription = ptr.To(v.(int)) - } - if v, has := data["message_ttl_seconds"]; has { - nsConfig.MessageTTLInSeconds = ptr.To(v.(int)) - } - if v, has := data["anti_affinity"]; has { - nsConfig.AntiAffinity = ptr.To(v.(string)) - } - if v, has := data["schema_validation_enforce"]; has { - nsConfig.SchemaValidationEnforce = ptr.To(v.(bool)) - } - if v, has := data["schema_compatibility_strategy"]; has { - nsConfig.SchemaCompatibilityStrategy = ptr.To(v.(string)) - } - if v, has := data["is_allow_auto_update_schema"]; has { - nsConfig.IsAllowAutoUpdateSchema = ptr.To(v.(bool)) - } - if v, has := data["offload_threshold_size_in_mb"]; has { - nsConfig.OffloadThresholdSizeInMb = ptr.To(v.(int)) - } + nsConfig.MaxProducersPerTopic = data["max_producers_per_topic"].(int) + nsConfig.MaxConsumersPerTopic = data["max_consumers_per_topic"].(int) + nsConfig.MaxConsumersPerSubscription = data["max_consumers_per_subscription"].(int) + nsConfig.MessageTTLInSeconds = data["message_ttl_seconds"].(int) + nsConfig.AntiAffinity = data["anti_affinity"].(string) + nsConfig.SchemaValidationEnforce = data["schema_validation_enforce"].(bool) + nsConfig.SchemaCompatibilityStrategy = data["schema_compatibility_strategy"].(string) + nsConfig.IsAllowAutoUpdateSchema = data["is_allow_auto_update_schema"].(bool) + nsConfig.OffloadThresholdSizeInMb = data["offload_threshold_size_in_mb"].(int) + } + + return &nsConfig +} + +func unmarshalNamespaceConfigList(v []interface{}) *types.NamespaceConfig { + var nsConfig types.NamespaceConfig + + for _, ns := range v { + data := ns.(map[string]interface{}) + rplClusters := data["replication_clusters"].([]interface{}) + + nsConfig.ReplicationClusters = handleHCLArrayV2(rplClusters) + nsConfig.MaxProducersPerTopic = data["max_producers_per_topic"].(int) + nsConfig.MaxConsumersPerTopic = data["max_consumers_per_topic"].(int) + nsConfig.MaxConsumersPerSubscription = data["max_consumers_per_subscription"].(int) + nsConfig.MessageTTLInSeconds = data["message_ttl_seconds"].(int) + nsConfig.AntiAffinity = data["anti_affinity"].(string) + nsConfig.SchemaValidationEnforce = data["schema_validation_enforce"].(bool) + nsConfig.SchemaCompatibilityStrategy = data["schema_compatibility_strategy"].(string) + nsConfig.IsAllowAutoUpdateSchema = data["is_allow_auto_update_schema"].(bool) + nsConfig.OffloadThresholdSizeInMb = data["offload_threshold_size_in_mb"].(int) } return &nsConfig diff --git a/types/types.go b/types/types.go index fe66509..18a3296 100644 --- a/types/types.go +++ b/types/types.go @@ -24,16 +24,16 @@ import ( type ( // configurable features of the Pulsar Namespace Entity via Terraform NamespaceConfig struct { - AntiAffinity *string + AntiAffinity string ReplicationClusters []string - MaxConsumersPerTopic *int - MaxProducersPerTopic *int - MaxConsumersPerSubscription *int - MessageTTLInSeconds *int - SchemaValidationEnforce *bool - SchemaCompatibilityStrategy *string - IsAllowAutoUpdateSchema *bool - OffloadThresholdSizeInMb *int + MaxConsumersPerTopic int + MaxProducersPerTopic int + MaxConsumersPerSubscription int + MessageTTLInSeconds int + SchemaValidationEnforce bool + SchemaCompatibilityStrategy string + IsAllowAutoUpdateSchema bool + OffloadThresholdSizeInMb int } SplitNS struct { From 1f076e3f3d2a4ebce0eff46354f07b8964cd8015 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 9 Jan 2025 00:20:33 +0800 Subject: [PATCH 7/9] fix lint --- pulsar/resource_pulsar_namespace.go | 40 ----------------------------- 1 file changed, 40 deletions(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 5bbef83..5d8eddb 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -209,7 +209,6 @@ func resourcePulsarNamespace() *schema.Resource { }, }, }, - //Set: namespaceConfigToHash, }, "persistence_policies": { Type: schema.TypeSet, @@ -746,23 +745,6 @@ func retentionPoliciesToHash(v interface{}) int { return hashcode.String(buf.String()) } -func namespaceConfigToHash(v interface{}) int { - var buf bytes.Buffer - m := v.(map[string]interface{}) - - buf.WriteString(fmt.Sprintf("%s-", m["anti_affinity"].(string))) - buf.WriteString(fmt.Sprintf("%d-", m["max_consumers_per_subscription"].(int))) - buf.WriteString(fmt.Sprintf("%d-", m["max_consumers_per_topic"].(int))) - buf.WriteString(fmt.Sprintf("%d-", m["max_producers_per_topic"].(int))) - buf.WriteString(fmt.Sprintf("%d-", m["message_ttl_seconds"].(int))) - buf.WriteString(fmt.Sprintf("%s-", m["replication_clusters"].([]interface{}))) - buf.WriteString(fmt.Sprintf("%t-", m["schema_validation_enforce"].(bool))) - buf.WriteString(fmt.Sprintf("%s-", m["schema_compatibility_strategy"].(string))) - buf.WriteString(fmt.Sprintf("%d-", m["offload_threshold_size_in_mb"].(int))) - - return hashcode.String(buf.String()) -} - func persistencePoliciesToHash(v interface{}) int { var buf bytes.Buffer m := v.(map[string]interface{}) @@ -819,28 +801,6 @@ func unmarshalRetentionPolicies(v *schema.Set) *utils.RetentionPolicies { return &rtnPolicies } -func unmarshalNamespaceConfig(v *schema.Set) *types.NamespaceConfig { - var nsConfig types.NamespaceConfig - - for _, ns := range v.List() { - data := ns.(map[string]interface{}) - rplClusters := data["replication_clusters"].([]interface{}) - - nsConfig.ReplicationClusters = handleHCLArrayV2(rplClusters) - nsConfig.MaxProducersPerTopic = data["max_producers_per_topic"].(int) - nsConfig.MaxConsumersPerTopic = data["max_consumers_per_topic"].(int) - nsConfig.MaxConsumersPerSubscription = data["max_consumers_per_subscription"].(int) - nsConfig.MessageTTLInSeconds = data["message_ttl_seconds"].(int) - nsConfig.AntiAffinity = data["anti_affinity"].(string) - nsConfig.SchemaValidationEnforce = data["schema_validation_enforce"].(bool) - nsConfig.SchemaCompatibilityStrategy = data["schema_compatibility_strategy"].(string) - nsConfig.IsAllowAutoUpdateSchema = data["is_allow_auto_update_schema"].(bool) - nsConfig.OffloadThresholdSizeInMb = data["offload_threshold_size_in_mb"].(int) - } - - return &nsConfig -} - func unmarshalNamespaceConfigList(v []interface{}) *types.NamespaceConfig { var nsConfig types.NamespaceConfig From 1f7dec3efac1e63b1410647ec1448493086cb1f9 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 9 Jan 2025 00:47:31 +0800 Subject: [PATCH 8/9] fix test cases --- pulsar/resource_pulsar_namespace.go | 49 +++++++++++++++--------- pulsar/resource_pulsar_namespace_test.go | 9 +++++ 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 5d8eddb..fd07e29 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -331,75 +331,86 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me _ = d.Set("namespace", namespace) _ = d.Set("tenant", tenant) - if namespaceConfig, ok := d.GetOk("namespace_config"); ok && len(namespaceConfig.([]interface{})) > 0 { + if _, ok := d.GetOk("namespace_config"); ok { + var namespaceConfig = make(map[string]interface{}) afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String()) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err)) + } else { + namespaceConfig["anti_affinity"] = strings.Trim(strings.TrimSpace(afgrp), "\"") } maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err)) + } else { + namespaceConfig["max_consumers_per_subscription"] = maxConsPerSub } maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err)) + } else { + namespaceConfig["max_consumers_per_topic"] = maxConsPerTopic } maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) + } else { + namespaceConfig["max_producers_per_topic"] = maxProdPerTopic } messageTTL, err := client.GetNamespaceMessageTTL(ns.String()) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err)) + } else { + namespaceConfig["message_ttl_seconds"] = messageTTL } schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)) + } else { + namespaceConfig["schema_validation_enforce"] = schemaValidationEnforce } schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns) if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) + if !strings.Contains(err.Error(), "Invalid auth strategy") { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) + } + } else { + namespaceConfig["schema_compatibility_strategy"] = schemaCompatibilityStrategy.String() } replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String()) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)) - } - - replClusters := make([]interface{}, len(replClustersRaw)) - for i, cl := range replClustersRaw { - replClusters[i] = cl + } else { + replClusters := make([]interface{}, len(replClustersRaw)) + for i, cl := range replClustersRaw { + replClusters[i] = cl + } + namespaceConfig["replication_clusters"] = replClusters } isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err)) + } else { + namespaceConfig["is_allow_auto_update_schema"] = isAllowAutoUpdateSchema } offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) + } else { + namespaceConfig["offload_threshold_size_in_mb"] = int(offloadTresholdSizeInMb) } _ = d.Set("namespace_config", []interface{}{ - map[string]interface{}{ - "anti_affinity": strings.Trim(strings.TrimSpace(afgrp), "\""), - "max_consumers_per_subscription": maxConsPerSub, - "max_consumers_per_topic": maxConsPerTopic, - "max_producers_per_topic": maxProdPerTopic, - "message_ttl_seconds": messageTTL, - "replication_clusters": replClusters, - "schema_validation_enforce": schemaValidationEnforce, - "schema_compatibility_strategy": schemaCompatibilityStrategy.String(), - "is_allow_auto_update_schema": isAllowAutoUpdateSchema, - "offload_threshold_size_in_mb": int(offloadTresholdSizeInMb), - }, + namespaceConfig, }) } diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index f967aed..de47d4c 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -184,6 +184,15 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) { resource.TestCheckNoResourceAttr(resourceName, "enable_deduplication"), resource.TestCheckNoResourceAttr(resourceName, "permission_grant.#"), ), + }, + { + Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName), + PlanOnly: true, + ExpectNonEmptyPlan: false, + }, + { + Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName), + PlanOnly: true, ExpectNonEmptyPlan: true, }, }, From 0f7f4387584f50977704a06b88c78519a46a9dda Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 9 Jan 2025 00:54:50 +0800 Subject: [PATCH 9/9] fix sink test case --- pulsar/resource_pulsar_namespace_test.go | 5 +++++ pulsar/resource_pulsar_sink_test.go | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index de47d4c..b792833 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -426,6 +426,11 @@ func TestNamespaceWithUndefinedOptionalsDrift(t *testing.T) { ), ExpectNonEmptyPlan: false, }, + { + Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName), + PlanOnly: true, + ExpectNonEmptyPlan: false, + }, }, }) } diff --git a/pulsar/resource_pulsar_sink_test.go b/pulsar/resource_pulsar_sink_test.go index 5d1784c..55752e3 100644 --- a/pulsar/resource_pulsar_sink_test.go +++ b/pulsar/resource_pulsar_sink_test.go @@ -254,7 +254,8 @@ func TestSinkUpdate(t *testing.T) { t.Fatal(err) } configString := string(configBytes) - configString = strings.ReplaceAll(configString, "sink-1", "update-sink-test-1") + newName := "sink" + acctest.RandString(10) + configString = strings.ReplaceAll(configString, "sink-1", newName) resource.Test(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, @@ -265,7 +266,7 @@ func TestSinkUpdate(t *testing.T) { { Config: configString, Check: resource.ComposeTestCheckFunc(func(s *terraform.State) error { - name := "pulsar_sink.update-sink-test-1" + name := "pulsar_sink." + newName rs, ok := s.RootModule().Resources[name] if !ok { return fmt.Errorf("%s not be found", name)