Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

force functions and connector using v3 api admin client #126

Merged
merged 9 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ A resource for creating and managing Apache Pulsar Functions.
```hcl
provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

resource "pulsar_function" "function-1" {
Expand Down Expand Up @@ -462,7 +461,6 @@ A resource for creating and managing Apache Pulsar Sources.
```hcl
provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

resource "pulsar_source" "source-1" {
Expand Down Expand Up @@ -514,7 +512,6 @@ A resource for creating and managing Apache Pulsar Sinks.
```hcl
provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

resource "pulsar_sink" "sample-sink-1" {
Expand Down
1 change: 0 additions & 1 deletion examples/functions/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ terraform {

provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

// Note: function resource requires v3 api.
Expand Down
1 change: 0 additions & 1 deletion examples/sinks/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ terraform {

provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

// Note: sink resource requires v3 api.
Expand Down
1 change: 0 additions & 1 deletion examples/sources/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ terraform {

provider "pulsar" {
web_service_url = "http://localhost:8080"
api_version = "3"
}

resource "pulsar_source" "source-1" {
Expand Down
6 changes: 5 additions & 1 deletion pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ import (
)

func getClientFromMeta(meta interface{}) admin.Client {
return meta.(admin.Client)
return meta.(PulsarClientBundle).Client
}

func getV3ClientFromMeta(meta interface{}) admin.Client {
return meta.(PulsarClientBundle).V3Client
}
49 changes: 45 additions & 4 deletions pulsar/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"os"
"strconv"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
pulsaradmin "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
adminconfig "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/pkg/errors"
Expand Down Expand Up @@ -69,6 +70,12 @@ func init() {
}
}

// PulsarClientBundle is a struct that holds the pulsar admin client for both v2 and v3 api versions
type PulsarClientBundle struct {
Client pulsaradmin.Client
V3Client pulsaradmin.Client
}

// Provider returns a schema.Provider
func Provider() *schema.Provider {
provider := &schema.Provider{
Expand Down Expand Up @@ -204,10 +211,17 @@ func providerConfigure(d *schema.ResourceData, tfVersion string) (interface{}, d
return nil, diag.FromErr(fmt.Errorf("ERROR_PULSAR_CONFIG_tls_TRUST_FILE_NOTEXIST: %q", TLSTrustCertsFilePath))
}

config := &config.Config{
configVersion := adminconfig.APIVersion(apiVersion)
// for backward compatibility, if user state api_version as 3
// we will use v2 as the default client version because we have v3 as individual client
if configVersion == adminconfig.V3 {
configVersion = adminconfig.APIVersion(0) // v2 will be the default client version
}

config := &adminconfig.Config{
WebServiceURL: clusterURL,
Token: token,
PulsarAPIVersion: config.APIVersion(apiVersion),
PulsarAPIVersion: configVersion,
TLSTrustCertsFilePath: TLSTrustCertsFilePath,
TLSAllowInsecureConnection: TLSAllowInsecureConnection,
IssuerEndpoint: issuerEndpoint,
Expand All @@ -226,7 +240,34 @@ func providerConfigure(d *schema.ResourceData, tfVersion string) (interface{}, d
return nil, diag.FromErr(err)
}

return client, nil
configV3 := &adminconfig.Config{
WebServiceURL: clusterURL,
Token: token,
PulsarAPIVersion: adminconfig.V3,
TLSTrustCertsFilePath: TLSTrustCertsFilePath,
TLSAllowInsecureConnection: TLSAllowInsecureConnection,
IssuerEndpoint: issuerEndpoint,
ClientID: clientID,
Audience: audience,
Scope: scope,
KeyFile: keyFilePath,
TLSKeyFile: TLSKeyFilePath,
TLSCertFile: TLSCertFilePath,
}

clientV3, err := admin.NewPulsarAdminClient(&admin.PulsarAdminConfig{
Config: configV3,
})
if err != nil {
return nil, diag.FromErr(err)
}

clientBundle := PulsarClientBundle{
Client: client,
V3Client: clientV3,
}

return clientBundle, nil
}

// Exists reports whether the named file or directory exists.
Expand Down
70 changes: 53 additions & 17 deletions pulsar/resource_pulsar_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/hashicorp/terraform-plugin-log/tflog"
Expand Down Expand Up @@ -380,7 +379,7 @@ func resourcePulsarFunction() *schema.Resource {
}

func resourcePulsarFunctionRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Functions()
client := getV3ClientFromMeta(meta).Functions()

tenant := d.Get(resourceFunctionTenantKey).(string)
namespace := d.Get(resourceFunctionNamespaceKey).(string)
Expand All @@ -396,13 +395,17 @@ func resourcePulsarFunctionRead(ctx context.Context, d *schema.ResourceData, met
return diag.FromErr(errors.Wrapf(err, "failed to get function %s", d.Id()))
}

unmarshalFunctionConfig(functionConfig, d)
err = unmarshalFunctionConfig(functionConfig, d)
if err != nil {
tflog.Debug(ctx, fmt.Sprintf("@@@Read function: %v", err))
return diag.Errorf("ERROR_UNMARSHAL_FUNCTION_CONFIG: %v", err)
}

return nil
}

func resourcePulsarFunctionCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Functions()
client := getV3ClientFromMeta(meta).Functions()

functionConfig, err := marshalFunctionConfig(d)
if err != nil {
Expand Down Expand Up @@ -434,7 +437,7 @@ func resourcePulsarFunctionCreate(ctx context.Context, d *schema.ResourceData, m
}

func resourcePulsarFunctionUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Functions()
client := getV3ClientFromMeta(meta).Functions()

functionConfig, err := marshalFunctionConfig(d)
if err != nil {
Expand Down Expand Up @@ -465,7 +468,7 @@ func resourcePulsarFunctionUpdate(ctx context.Context, d *schema.ResourceData, m
}

func resourcePulsarFunctionDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Functions()
client := getV3ClientFromMeta(meta).Functions()

tenant := d.Get(resourceFunctionTenantKey).(string)
namespace := d.Get(resourceFunctionNamespaceKey).(string)
Expand Down Expand Up @@ -676,19 +679,31 @@ func marshalFunctionConfig(d *schema.ResourceData) (*utils.FunctionConfig, error

func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.ResourceData) error {
if functionConfig.Jar != nil {
d.Set(resourceFunctionJarKey, *functionConfig.Jar)
err := d.Set(resourceFunctionJarKey, *functionConfig.Jar)
if err != nil {
return err
}
}

if functionConfig.Py != nil {
d.Set(resourceFunctionPyKey, *functionConfig.Py)
err := d.Set(resourceFunctionPyKey, *functionConfig.Py)
if err != nil {
return err
}
}

if functionConfig.Go != nil {
d.Set(resourceFunctionGoKey, *functionConfig.Go)
err := d.Set(resourceFunctionGoKey, *functionConfig.Go)
if err != nil {
return err
}
}

if functionConfig.ClassName != "" {
d.Set(resourceFunctionClassNameKey, functionConfig.ClassName)
err := d.Set(resourceFunctionClassNameKey, functionConfig.ClassName)
if err != nil {
return err
}
}

if len(functionConfig.Inputs) != 0 {
Expand All @@ -702,31 +717,52 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso
}

if functionConfig.TopicsPattern != nil {
d.Set(resourceFunctionTopicsPatternKey, *functionConfig.TopicsPattern)
err := d.Set(resourceFunctionTopicsPatternKey, *functionConfig.TopicsPattern)
if err != nil {
return err
}
}

if functionConfig.Parallelism != 0 {
d.Set(resourceFunctionParallelismKey, functionConfig.Parallelism)
err := d.Set(resourceFunctionParallelismKey, functionConfig.Parallelism)
if err != nil {
return err
}
}

if functionConfig.Output != "" {
d.Set(resourceFunctionOutputKey, functionConfig.Output)
err := d.Set(resourceFunctionOutputKey, functionConfig.Output)
if err != nil {
return err
}
}

if functionConfig.Parallelism != 0 {
d.Set(resourceFunctionParallelismKey, functionConfig.Parallelism)
err := d.Set(resourceFunctionParallelismKey, functionConfig.Parallelism)
if err != nil {
return err
}
}

if functionConfig.ProcessingGuarantees != "" {
d.Set(resourceFunctionProcessingGuaranteesKey, functionConfig.ProcessingGuarantees)
err := d.Set(resourceFunctionProcessingGuaranteesKey, functionConfig.ProcessingGuarantees)
if err != nil {
return err
}
}

if functionConfig.SubName != "" {
d.Set(resourceFunctionSubscriptionNameKey, functionConfig.SubName)
err := d.Set(resourceFunctionSubscriptionNameKey, functionConfig.SubName)
if err != nil {
return err
}
}

if functionConfig.SubscriptionPosition != "" {
d.Set(resourceFunctionSubscriptionPositionKey, functionConfig.SubscriptionPosition)
err := d.Set(resourceFunctionSubscriptionPositionKey, functionConfig.SubscriptionPosition)
if err != nil {
return err
}
}

err := d.Set(resourceFunctionCleanupSubscriptionKey, functionConfig.CleanupSubscription)
Expand Down
11 changes: 6 additions & 5 deletions pulsar/resource_pulsar_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ package pulsar
import (
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"testing"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
Expand All @@ -37,13 +36,13 @@ func init() {
}

func TestFunction(t *testing.T) {
configBytes, err := ioutil.ReadFile("testdata/function/main.tf")
configBytes, err := os.ReadFile("testdata/function/main.tf")
if err != nil {
t.Fatal(err)
}

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheckWithAPIVersion(t, config.V3) },
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
PreventPostDestroyRefresh: false,
CheckDestroy: testPulsarFunctionDestroy,
Expand All @@ -65,11 +64,13 @@ func TestFunction(t *testing.T) {
if config == nil {
return fmt.Errorf("failed to create %s function", rs.Primary.ID)
}
fmt.Printf("config: %v\n", config)

assert.Equal(t, "function-1", config.Name)
assert.Equal(t, "public", config.Tenant)
assert.Equal(t, "default", config.Namespace)
assert.Equal(t, ProcessingGuaranteesAtLeastOnce, config.ProcessingGuarantees)
assert.NotNil(t, config.TimeoutMs)
assert.Equal(t, int64(6666), *config.TimeoutMs)
assert.NotNil(t, config.Resources)

Expand Down Expand Up @@ -100,7 +101,7 @@ func testPulsarFunctionDestroy(s *terraform.State) error {
}

func getPulsarFunctionByResourceID(id string) (*utils.FunctionConfig, error) {
client := getClientFromMeta(testAccProvider.Meta()).Functions()
client := getV3ClientFromMeta(testAccProvider.Meta()).Functions()

parts := strings.Split(id, "/")
if len(parts) != 3 {
Expand Down
9 changes: 4 additions & 5 deletions pulsar/resource_pulsar_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand Down Expand Up @@ -360,7 +359,7 @@ func resourcePulsarSink() *schema.Resource {
}

func resourcePulsarSinkCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Sinks()
client := getV3ClientFromMeta(meta).Sinks()

sinkConfig, err := marshalSinkConfig(d)
if err != nil {
Expand All @@ -380,7 +379,7 @@ func resourcePulsarSinkCreate(ctx context.Context, d *schema.ResourceData, meta
}

func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Sinks()
client := getV3ClientFromMeta(meta).Sinks()

tenant := d.Get(resourceSinkTenantKey).(string)
namespace := d.Get(resourceSinkNamespaceKey).(string)
Expand Down Expand Up @@ -599,7 +598,7 @@ func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta in
}

func resourcePulsarSinkUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Sinks()
client := getV3ClientFromMeta(meta).Sinks()

sinkConfig, err := marshalSinkConfig(d)
if err != nil {
Expand All @@ -620,7 +619,7 @@ func resourcePulsarSinkUpdate(ctx context.Context, d *schema.ResourceData, meta
}

func resourcePulsarSinkDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(admin.Client).Sinks()
client := getV3ClientFromMeta(meta).Sinks()

tenant := d.Get(resourceSinkTenantKey).(string)
namespace := d.Get(resourceSinkNamespaceKey).(string)
Expand Down
Loading
Loading