diff --git a/pulsar/resource_pulsar_cluster_test.go b/pulsar/resource_pulsar_cluster_test.go index 360b1b7..840595e 100644 --- a/pulsar/resource_pulsar_cluster_test.go +++ b/pulsar/resource_pulsar_cluster_test.go @@ -56,6 +56,11 @@ func TestHandleExistingCluster(t *testing.T) { PreCheck: func() { testAccPreCheck(t) createCluster(t, cName) + t.Cleanup(func() { + if err := getClientFromMeta(testAccProvider.Meta()).Clusters().Delete(cName); err != nil { + t.Fatalf("ERROR_DELETING_TEST_CLUSTER: %v", err) + } + }) }, CheckDestroy: testPulsarClusterDestroy, ProviderFactories: testAccProviderFactories, @@ -75,6 +80,11 @@ func TestImportExistingCluster(t *testing.T) { PreCheck: func() { testAccPreCheck(t) createCluster(t, cName) + t.Cleanup(func() { + if err := getClientFromMeta(testAccProvider.Meta()).Clusters().Delete(cName); err != nil { + t.Fatalf("ERROR_DELETING_TEST_CLUSTER: %v", err) + } + }) }, CheckDestroy: testPulsarClusterDestroy, ProviderFactories: testAccProviderFactories, diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index 1e2b7a2..1f15fd0 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -332,6 +332,11 @@ func TestImportExistingNamespace(t *testing.T) { PreCheck: func() { testAccPreCheck(t) createNamespace(t, id) + t.Cleanup(func() { + if err := getClientFromMeta(testAccProvider.Meta()).Namespaces().DeleteNamespace(id); err != nil { + t.Fatalf("ERROR_DELETING_TEST_NS: %v", err) + } + }) }, CheckDestroy: testPulsarNamespaceDestroy, ProviderFactories: testAccProviderFactories, diff --git a/pulsar/resource_pulsar_sink_test.go b/pulsar/resource_pulsar_sink_test.go index 8545334..5d1784c 100644 --- a/pulsar/resource_pulsar_sink_test.go +++ b/pulsar/resource_pulsar_sink_test.go @@ -115,12 +115,24 @@ func testPulsarSinkDestroy(s *terraform.State) error { func TestImportExistingSink(t *testing.T) { sinkName := acctest.RandString(6) - err := createSampleSink(sinkName) - if err != nil { - t.Fatal(err) - } resource.Test(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + createSampleSink(sinkName) + t.Cleanup(func() { + if err := getClientFromMeta(testAccProvider.Meta()).Sinks().DeleteSink( + "public", + "default", + sinkName, + ); err != nil { + if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 { + return + } + t.Fatalf("ERROR_DELETING_TEST_SINK: %v", err) + } + }) + }, ProviderFactories: testAccProviderFactories, CheckDestroy: testPulsarSinkDestroy, Steps: []resource.TestStep{ diff --git a/pulsar/resource_pulsar_source_test.go b/pulsar/resource_pulsar_source_test.go index 4c2405c..0ca7bf5 100644 --- a/pulsar/resource_pulsar_source_test.go +++ b/pulsar/resource_pulsar_source_test.go @@ -133,12 +133,23 @@ func getPulsarSourceByResourceID(id string) (*utils.SourceConfig, error) { func TestImportExistingSource(t *testing.T) { sourceName := acctest.RandString(6) - err := createSampleSource(sourceName) - if err != nil { - t.Fatal(err) - } resource.Test(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + createSampleSource(sourceName) + t.Cleanup(func() { + if err := getClientFromMeta(testAccProvider.Meta()).Sources().DeleteSource( + "public", + "default", + sourceName); err != nil { + if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 { + return + } + t.Fatalf("ERROR_DELETING_TEST_SOURCE: %v", err) + } + }) + }, ProviderFactories: testAccProviderFactories, CheckDestroy: testPulsarSourceDestroy, Steps: []resource.TestStep{ diff --git a/pulsar/resource_pulsar_tenant_test.go b/pulsar/resource_pulsar_tenant_test.go index 59cf358..14031f4 100644 --- a/pulsar/resource_pulsar_tenant_test.go +++ b/pulsar/resource_pulsar_tenant_test.go @@ -57,6 +57,11 @@ func TestHandleExistingTenant(t *testing.T) { PreCheck: func() { testAccPreCheck(t) createTenant(t, tName) + t.Cleanup(func() { + if err := getClientFromMeta(testAccProvider.Meta()).Tenants().Delete(tName); err != nil { + t.Fatalf("ERROR_DELETING_TEST_TENANT: %v", err) + } + }) }, ProviderFactories: testAccProviderFactories, PreventPostDestroyRefresh: false, @@ -77,6 +82,11 @@ func TestImportExistingTenant(t *testing.T) { PreCheck: func() { testAccPreCheck(t) createTenant(t, tName) + t.Cleanup(func() { + if err := getClientFromMeta(testAccProvider.Meta()).Tenants().Delete(tName); err != nil { + t.Fatalf("ERROR_DELETING_TEST_TENANT: %v", err) + } + }) }, CheckDestroy: testPulsarTenantDestroy, ProviderFactories: testAccProviderFactories, diff --git a/pulsar/resource_pulsar_topic.go b/pulsar/resource_pulsar_topic.go index bb2e19c..bc5dfc8 100644 --- a/pulsar/resource_pulsar_topic.go +++ b/pulsar/resource_pulsar_topic.go @@ -21,6 +21,7 @@ import ( "context" "fmt" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/cenkalti/backoff/v4" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" @@ -162,6 +163,10 @@ func resourcePulsarTopicRead(ctx context.Context, d *schema.ResourceData, meta i topicName, found, err := getTopic(d, meta) if err != nil { + if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 { + d.SetId("") + return nil + } return diag.Errorf("%v", err) } if !found { diff --git a/pulsar/resource_pulsar_topic_test.go b/pulsar/resource_pulsar_topic_test.go index 8386c00..237c44f 100644 --- a/pulsar/resource_pulsar_topic_test.go +++ b/pulsar/resource_pulsar_topic_test.go @@ -44,15 +44,15 @@ func TestTopic(t *testing.T) { { Config: testPulsarPartitionTopic, Check: resource.ComposeTestCheckFunc( - testPulsarTopicExists("pulsar_topic.sample-topic-1"), - testPulsarTopicExists("pulsar_topic.sample-topic-2"), + testPulsarTopicExists("pulsar_topic.sample-topic-1", t), + testPulsarTopicExists("pulsar_topic.sample-topic-2", t), ), }, { Config: testPulsarNonPartitionTopic, Check: resource.ComposeTestCheckFunc( - testPulsarTopicExists("pulsar_topic.sample-topic-3"), - testPulsarTopicExists("pulsar_topic.sample-topic-4"), + testPulsarTopicExists("pulsar_topic.sample-topic-3", t), + testPulsarTopicExists("pulsar_topic.sample-topic-4", t), ), }, }, @@ -65,11 +65,20 @@ func TestImportExistingTopic(t *testing.T) { pnum := 10 fullID := strings.Join([]string{ttype + ":/", "public", "default", tname}, "/") + topicName, err := utils.GetTopicName(fullID) + if err != nil { + t.Fatalf("ERROR_GETTING_TOPIC_NAME: %v", err) + } resource.Test(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) createTopic(t, fullID, pnum) + t.Cleanup(func() { + if err := getClientFromMeta(testAccProvider.Meta()).Topics().Delete(*topicName, true, pnum == 0); err != nil { + t.Fatalf("ERROR_DELETING_TEST_TOPIC: %v", err) + } + }) }, ProviderFactories: testAccProviderFactories, CheckDestroy: testPulsarTopicDestroy, @@ -93,6 +102,61 @@ func TestPartionedTopicWithPermissionGrantUpdate(t *testing.T) { testTopicWithPermissionGrantUpdate(t, 10) } +func TestTopicNamespaceExternallyRemoved(t *testing.T) { + + resourceName := "pulsar_topic.test" + tName := acctest.RandString(10) + nsName := acctest.RandString(10) + topicName := acctest.RandString(10) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + CheckDestroy: testPulsarTopicDestroy, + Steps: []resource.TestStep{ + { + Config: testPulsarNamespaceWithTopic(testWebServiceURL, tName, nsName, topicName), + Check: resource.ComposeTestCheckFunc( + testPulsarTopicExists(resourceName, t), + ), + ExpectError: nil, + }, + { + PreConfig: func() { + client := getClientFromMeta(testAccProvider.Meta()) + topicName, err := utils.GetTopicName(fmt.Sprintf("persistent://%s/%s/%s", tName, nsName, topicName)) + if err != nil { + t.Fatalf("ERROR_GETTING_TOPIC_NAME: %v", err) + } + namespace, err := utils.GetNameSpaceName(topicName.GetTenant(), topicName.GetNamespace()) + if err != nil { + t.Fatalf("ERROR_READ_NAMESPACE: %v", err) + } + partitionedTopics, nonPartitionedTopics, err := client.Topics().List(*namespace) + if err != nil { + t.Fatalf("ERROR_READ_TOPIC_DATA: %v", err) + } + + for _, topic := range append(partitionedTopics, nonPartitionedTopics...) { + if topicName.String() == topic { + if err = client.Topics().Delete(*topicName, true, true); err != nil { + t.Fatalf("ERROR_DELETING_TEST_TOPIC: %v", err) + } + } + } + if err = client.Namespaces().DeleteNamespace(tName + "/" + nsName); err != nil { + t.Fatalf("ERROR_DELETING_TEST_NS: %v", err) + } + }, + Config: testPulsarNamespaceWithTopic(testWebServiceURL, tName, nsName, topicName), + PlanOnly: true, + ExpectNonEmptyPlan: true, + ExpectError: nil, + }, + }, + }) +} + func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) { resourceName := "pulsar_topic.test" tname := acctest.RandString(10) @@ -114,7 +178,7 @@ func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) { actions = ["produce", "consume"] }`), Check: resource.ComposeTestCheckFunc( - testPulsarTopicExists(resourceName), + testPulsarTopicExists(resourceName, t), resource.TestCheckResourceAttr(resourceName, "permission_grant.#", "2"), resource.TestCheckResourceAttr(resourceName, "permission_grant.0.role", "some-role-1"), resource.TestCheckResourceAttr(resourceName, "permission_grant.0.actions.#", "3"), @@ -134,7 +198,7 @@ func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) { actions = ["produce"] }`), Check: resource.ComposeTestCheckFunc( - testPulsarTopicExists(resourceName), + testPulsarTopicExists(resourceName, t), resource.TestCheckResourceAttr(resourceName, "permission_grant.#", "1"), resource.TestCheckResourceAttr(resourceName, "permission_grant.0.role", "some-role-2"), resource.TestCheckResourceAttr(resourceName, "permission_grant.0.actions.#", "1"), @@ -177,7 +241,7 @@ func testPulsarTopicDestroy(s *terraform.State) error { return nil } -func testPulsarTopicExists(topic string) resource.TestCheckFunc { +func testPulsarTopicExists(topic string, t *testing.T) resource.TestCheckFunc { return func(s *terraform.State) error { rs, ok := s.RootModule().Resources[topic] if !ok { @@ -188,6 +252,7 @@ func testPulsarTopicExists(topic string) resource.TestCheckFunc { if err != nil { return fmt.Errorf("ERROR_READ_TOPIC: %w", err) } + t.Logf("topicName: %v", topicName) namespace, err := utils.GetNameSpaceName(topicName.GetTenant(), topicName.GetNamespace()) if err != nil { return fmt.Errorf("ERROR_READ_NAMESPACE: %w", err) @@ -338,3 +403,41 @@ resource "pulsar_topic" "test" { } `, url, ttype, tname, pnum, permissionGrants) } + +func testPulsarNamespaceWithTopic(wsURL, tenant, ns, topicName string) string { + return fmt.Sprintf(` +provider "pulsar" { + web_service_url = "%s" +} + +resource "pulsar_tenant" "test_tenant" { + tenant = "%s" + allowed_clusters = ["standalone"] +} + +resource "pulsar_namespace" "test" { + tenant = pulsar_tenant.test_tenant.tenant + namespace = "%s" + + topic_auto_creation { + enable = false + } + + depends_on = [ + pulsar_tenant.test_tenant + ] +} + +resource "pulsar_topic" "test" { + tenant = "%s" + namespace = "%s" + topic_type = "persistent" + topic_name = "%s" + partitions = 0 + + depends_on = [ + pulsar_namespace.test + ] +} +`, wsURL, tenant, ns, tenant, ns, topicName) +}