Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor connection management #17

Merged
merged 12 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
xk6-cb
115 changes: 84 additions & 31 deletions couchbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package xk6_couchbase

import (
"fmt"
"sync"
"time"

"github.com/couchbase/gocb/v2"
Expand All @@ -12,32 +13,89 @@ func init() {
k6modules.Register("k6/x/couchbase", new(CouchBase))
}

var (
singletonClient *Client
errz error
once sync.Once
)

type CouchBase struct{}

type Client struct {
client *gocb.Cluster
cluster *gocb.Cluster

// Key: bucketName (string)
// Value: *gocb.Bucket
bucketsConnections sync.Map
mu sync.Mutex
}

func (*CouchBase) NewClient(connectionString, username, password string) interface{} {
// For a secure cluster connection, use `couchbases://<your-cluster-ip>` instead.
cluster, err := gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: username,
Password: password,
func getCouchbaseInstance(connectionString string, username string, password string) (*Client, error) {
once.Do(
func() {
// For a secure cluster connection, use `couchbases://<your-cluster-ip>` instead.
cluster, err := gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: username,
Password: password,
},
})

if err != nil {
errz = err
return
}

singletonClient = &Client{cluster: cluster}
},
})
)
return singletonClient, errz
}

func (*CouchBase) NewClient(connectionString string, username string, password string) interface{} {
client, err := getCouchbaseInstance(connectionString, username, password)
if err != nil {
return err
return fmt.Errorf("failed to connect to couchase cluster %s. Err: %w", connectionString, errz)
}
return client
}

// TODO: Create bucket connections on inits and remove mutex.
func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) {
bucket, found := c.bucketsConnections.Load(bucketName)
if !found || bucket == nil {
// TODO: Replace printfs with logrus (used in other extensions..)
// fmt.Printf("bucket %s not found, client instance: %v\n", bucketName, c)
// Create bucket connections
c.mu.Lock()
sharathjag marked this conversation as resolved.
Show resolved Hide resolved
defer c.mu.Unlock()
bucket, found := c.bucketsConnections.Load(bucketName)
if found && bucket != nil {
return bucket.(*gocb.Bucket), nil
}

return &Client{client: cluster}
newBucket := c.cluster.Bucket(bucketName)
// fmt.Printf("bucket %s connected\n", bucketName)
err := newBucket.WaitUntilReady(5*time.Second, nil)
if err != nil {
return nil, fmt.Errorf("failed to wait for bucket %s. Err: %w", bucketName, err)
}
// fmt.Printf("bucket %s ready\n", bucketName)
c.bucketsConnections.Store(bucketName, newBucket)
bucket, loaded := c.bucketsConnections.Load(bucketName)
if !loaded {
return nil, fmt.Errorf("failed to load bucket %s", bucketName)
}
// fmt.Printf("bucket %s loaded %v\n", bucket.(*gocb.Bucket).Name(), loaded)
sharathjag marked this conversation as resolved.
Show resolved Hide resolved
return bucket.(*gocb.Bucket), nil
}
return bucket.(*gocb.Bucket), nil
}

func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) error {
bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
bucket, err := c.getBucket(bucketName)
if err != nil {
return err
return fmt.Errorf("failed to create bucket connection for insert. Err: %w", err)
}
col := bucket.Scope(scope).Collection(collection)
_, err = col.Insert(docId, doc, nil)
Expand All @@ -48,10 +106,9 @@ func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) er
}

func (c *Client) Upsert(bucketName, scope, collection, docId string, doc any) error {
bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
bucket, err := c.getBucket(bucketName)
if err != nil {
return err
return fmt.Errorf("failed to create bucket connection for upsert. Err: %w", err)
}
col := bucket.Scope(scope).Collection(collection)
_, err = col.Upsert(docId, doc, nil)
Expand All @@ -62,12 +119,10 @@ func (c *Client) Upsert(bucketName, scope, collection, docId string, doc any) er
}

func (c *Client) Remove(bucketName, scope, collection, docId string) error {
bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
bucket, err := c.getBucket(bucketName)
if err != nil {
return err
return fmt.Errorf("failed to create bucket connection for remove. Err: %w", err)
}

col := bucket.Scope(scope).Collection(collection)

// Remove with Durability
Expand All @@ -82,18 +137,17 @@ func (c *Client) Remove(bucketName, scope, collection, docId string) error {
}

func (c *Client) InsertBatch(bucketName, scope, collection string, docs map[string]any) error {
bucket, err := c.getBucket(bucketName)
if err != nil {
return fmt.Errorf("failed to create bucket connection for insertBatch. Err: %w", err)
}

batchItems := make([]gocb.BulkOp, len(docs))
index := 0
for k, v := range docs {
batchItems[index] = &gocb.InsertOp{ID: k, Value: v}
index++
}

bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
if err != nil {
return err
}
col := bucket.Scope(scope).Collection(collection)
err = col.Do(batchItems, &gocb.BulkOpOptions{Timeout: 3 * time.Second})
if err != nil {
Expand All @@ -106,7 +160,7 @@ func (c *Client) InsertBatch(bucketName, scope, collection string, docs map[stri
func (c *Client) Find(query string) (any, error) {
var result interface{}

queryResult, err := c.client.Query(
queryResult, err := c.cluster.Query(
fmt.Sprintf(query),
&gocb.QueryOptions{},
)
Expand All @@ -127,10 +181,9 @@ func (c *Client) Find(query string) (any, error) {

func (c *Client) FindOne(bucketName, scope, collection, docId string) (any, error) {
var result interface{}
bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
bucket, err := c.getBucket(bucketName)
if err != nil {
return result, err
return result, fmt.Errorf("failed to create bucket connection for findOne. Err: %w", err)
}
bucketScope := bucket.Scope(scope)

Expand All @@ -149,7 +202,7 @@ func (c *Client) FindOne(bucketName, scope, collection, docId string) (any, erro

func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, error) {
var result interface{}
queryResult, err := c.client.Query(
queryResult, err := c.cluster.Query(
query,
&gocb.QueryOptions{
Adhoc: true,
Expand Down