Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor connection management #17

Merged
merged 12 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
xk6-cb
235 changes: 197 additions & 38 deletions couchbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package xk6_couchbase

import (
"fmt"
"sync"
"time"

"github.com/couchbase/gocb/v2"
Expand All @@ -12,32 +13,104 @@ func init() {
k6modules.Register("k6/x/couchbase", new(CouchBase))
}

const (
defaultBucketReadinessTimeout = 5 * time.Second
defaultDoConnectionPerVU = true
defaultConnectionBufferSizeBytes = 2048
)

var (
singletonClient *Client
errz error
once sync.Once
)

type CouchBase struct{}

type options struct {
DoConnectionPerVU bool `json:"do_connection_per_vu,omitempty"`
BucketReadinessTimeout time.Duration `json:"bucket_readiness_timeout,omitempty"`
BucketsToWarm []string `json:"buckets_to_warm,omitempty"`
ConnectionBufferSizeBytes int `json:"connection_buffer_size_bytes,omitempty"`
}

type DBConfig struct {
Hostname string `json:"connection_string,omitempty"`
Username string `json:"-"`
Password string `json:"-"`
}

type Client struct {
client *gocb.Cluster
cluster *gocb.Cluster
options options

// Key: bucketName (string)
// Value: *gocb.Bucket
bucketsConnections sync.Map
mu sync.Mutex
}

func (*CouchBase) NewClient(connectionString, username, password string) interface{} {
// For a secure cluster connection, use `couchbases://<your-cluster-ip>` instead.
cluster, err := gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: username,
Password: password,
},
})
func (c *CouchBase) NewClientPerVU(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string, connectionBufferSizeBytes int) (*Client, error) {
opts := options{
DoConnectionPerVU: true,
BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration),
BucketsToWarm: bucketsToWarm,
ConnectionBufferSizeBytes: connectionBufferSizeBytes,
}
return c.NewClientWithOptions(dbConfig, opts)
}

func (c *CouchBase) NewClientWithSharedConnection(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string, connectionBufferSizeBytes int) (*Client, error) {
opts := options{
DoConnectionPerVU: false,
BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration),
BucketsToWarm: bucketsToWarm,
ConnectionBufferSizeBytes: connectionBufferSizeBytes,
}

return c.NewClientWithOptions(dbConfig, opts)
}

func (c *CouchBase) NewClientWithOptions(dbConfig DBConfig, opts options) (*Client, error) {
if opts.ConnectionBufferSizeBytes < 1 {
opts.ConnectionBufferSizeBytes = defaultConnectionBufferSizeBytes
}
client, err := getCouchbaseInstance(dbConfig, opts)
if err != nil {
return err
return nil, fmt.Errorf("failed to create new couchbase connection with options for cluster %s. Err: %w", dbConfig.Hostname, err)
}

return &Client{client: cluster}
// Optionally warm the bucket on client's request
for _, bucket := range opts.BucketsToWarm {
_, err := client.connectBucketOrLoad(bucket)
if err != nil {
return nil, fmt.Errorf("failed to connect to bucket :%s, Err: %w", bucket, err)
}
}
return client, nil
}

func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) error {
bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
func (*CouchBase) NewClient(connectionString string, username string, password string) interface{} {
dbConfig := DBConfig{
Hostname: connectionString,
Username: username,
Password: password,
}
opts := options{
DoConnectionPerVU: defaultDoConnectionPerVU,
BucketReadinessTimeout: defaultBucketReadinessTimeout,
}
client, err := getCouchbaseInstance(dbConfig, opts)
if err != nil {
return err
return fmt.Errorf("failed to connect to couchase cluster %s. Err: %w", connectionString, err)
}
return client
}

func (c *Client) Insert(bucketName string, scope string, collection string, docId string, doc any) error {
bucket, err := c.getBucket(bucketName)
if err != nil {
return fmt.Errorf("failed to create bucket connection for insert. Err: %w", err)
}
col := bucket.Scope(scope).Collection(collection)
_, err = col.Insert(docId, doc, nil)
Expand All @@ -47,11 +120,10 @@ func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) er
return nil
}

func (c *Client) Upsert(bucketName, scope, collection, docId string, doc any) error {
bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
func (c *Client) Upsert(bucketName string, scope string, collection string, docId string, doc any) error {
bucket, err := c.getBucket(bucketName)
if err != nil {
return err
return fmt.Errorf("failed to create bucket connection for upsert. Err: %w", err)
}
col := bucket.Scope(scope).Collection(collection)
_, err = col.Upsert(docId, doc, nil)
Expand All @@ -61,13 +133,11 @@ func (c *Client) Upsert(bucketName, scope, collection, docId string, doc any) er
return nil
}

func (c *Client) Remove(bucketName, scope, collection, docId string) error {
bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
func (c *Client) Remove(bucketName string, scope string, collection string, docId string) error {
bucket, err := c.getBucket(bucketName)
if err != nil {
return err
return fmt.Errorf("failed to create bucket connection for remove. Err: %w", err)
}

col := bucket.Scope(scope).Collection(collection)

// Remove with Durability
Expand All @@ -81,19 +151,18 @@ func (c *Client) Remove(bucketName, scope, collection, docId string) error {
return nil
}

func (c *Client) InsertBatch(bucketName, scope, collection string, docs map[string]any) error {
func (c *Client) InsertBatch(bucketName string, scope string, collection string, docs map[string]any) error {
bucket, err := c.getBucket(bucketName)
if err != nil {
return fmt.Errorf("failed to create bucket connection for insertBatch. Err: %w", err)
}

batchItems := make([]gocb.BulkOp, len(docs))
index := 0
for k, v := range docs {
batchItems[index] = &gocb.InsertOp{ID: k, Value: v}
index++
}

bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
if err != nil {
return err
}
col := bucket.Scope(scope).Collection(collection)
err = col.Do(batchItems, &gocb.BulkOpOptions{Timeout: 3 * time.Second})
if err != nil {
Expand All @@ -106,7 +175,7 @@ func (c *Client) InsertBatch(bucketName, scope, collection string, docs map[stri
func (c *Client) Find(query string) (any, error) {
var result interface{}

queryResult, err := c.client.Query(
queryResult, err := c.cluster.Query(
fmt.Sprintf(query),
&gocb.QueryOptions{},
)
Expand All @@ -125,15 +194,26 @@ func (c *Client) Find(query string) (any, error) {
return result, nil
}

func (c *Client) FindOne(bucketName, scope, collection, docId string) (any, error) {
var result interface{}
bucket := c.client.Bucket(bucketName)
err := bucket.WaitUntilReady(5*time.Second, nil)
func (c *Client) Exists(bucketName string, scope string, collection string, docId string) error {
bucket, err := c.getBucket(bucketName)
if err != nil {
return result, err
return fmt.Errorf("failed to get bucket connection for findOne. Err: %w", err)
}
bucketScope := bucket.Scope(scope)
_, err = bucketScope.Collection(collection).Exists(docId, nil)
if err != nil {
return err
}
return nil
}

func (c *Client) FindOne(bucketName string, scope string, collection string, docId string) (any, error) {
var result interface{}
bucket, err := c.getBucket(bucketName)
if err != nil {
return result, fmt.Errorf("failed to get bucket connection for findOne. Err: %w", err)
}
bucketScope := bucket.Scope(scope)
getResult, err := bucketScope.Collection(collection).Get(docId, nil)
if err != nil {
return result, err
Expand All @@ -149,7 +229,7 @@ func (c *Client) FindOne(bucketName, scope, collection, docId string) (any, erro

func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, error) {
var result interface{}
queryResult, err := c.client.Query(
queryResult, err := c.cluster.Query(
query,
&gocb.QueryOptions{
Adhoc: true,
Expand All @@ -170,3 +250,82 @@ func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, e

return result, nil
}

func (c *Client) Close() error {
opts := gocb.ClusterCloseOptions{}
return c.cluster.Close(&opts)
}

// TODO: Create bucket connections on inits and remove mutex.
func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) {
return c.connectBucketOrLoad(bucketName)
}

func (c *Client) connectBucketOrLoad(bucketName string) (*gocb.Bucket, error) {
bucket, found := c.bucketsConnections.Load(bucketName)
if !found || bucket == nil {
// Create bucket connections
// Mutex Lock to ensure that the bucket is instantiated only once in shared cluster connection mode.
c.mu.Lock()
defer c.mu.Unlock()
bucket, found := c.bucketsConnections.Load(bucketName)
if found && bucket != nil {
return bucket.(*gocb.Bucket), nil
}

newBucket := c.cluster.Bucket(bucketName)
err := newBucket.WaitUntilReady(c.options.BucketReadinessTimeout, nil)
if err != nil {
return nil, fmt.Errorf("failed to wait for bucket %s, timeout: %v. Err: %w", bucketName, c.options.BucketReadinessTimeout, err)
}
c.bucketsConnections.Store(bucketName, newBucket)
return newBucket, nil

}
return bucket.(*gocb.Bucket), nil
}

func getCouchbaseInstance(dbConfig DBConfig, opts options) (*Client, error) {
if opts.DoConnectionPerVU {
return instantiateNewConnection(dbConfig, opts)
}

once.Do(
func() {
client, err := instantiateNewConnection(dbConfig, opts)
if err != nil {
errz = err
return
}
singletonClient = client
},
)
return singletonClient, errz
}

func instantiateNewConnection(dbConfig DBConfig, options options) (*Client, error) {
// For a secure cluster connection, use `couchbases://<your-cluster-ip>` instead.
connStr := fmt.Sprintf("couchbase://%s?kv_buffer_size=2048", dbConfig.Hostname)
// connStr := "couchbase://"+dbConfig.Hostname
cluster, err := gocb.Connect(connStr, gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: dbConfig.Username,
Password: dbConfig.Password,
},
// TODO: Set timeoutConfig
})
if err != nil {
return nil, fmt.Errorf("faile to instantiate new connection to couchbase cluster %s. Err: %w", dbConfig.Hostname, err)
}

return &Client{cluster: cluster, options: options}, nil
}

func parseStringToDuration(bucketReadinessDuration string) time.Duration {
readinessDuration, err := time.ParseDuration(bucketReadinessDuration)
if err != nil {
readinessDuration = defaultBucketReadinessTimeout
}

return readinessDuration
}
11 changes: 11 additions & 0 deletions examples/test-new-with-conn-per-vu.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import xk6_couchbase from 'k6/x/couchbase';


const dbConfig = { hostname: 'localhost', username: '<username>', password: '<password>' };
const bucketsToPreWarm = ['test'];
const client = xk6_couchbase.newClientPerVU(dbConfig, bucketsToPreWarm, "5s");
export default () => {
// syntax :: client.findOne("<db>", "<scope>", "<keyspace>", "<docId>");
var res = client.findOne("test", "_default", "_default", "002wPJwiJArcUpz");
console.log(res);
}
11 changes: 11 additions & 0 deletions examples/test-new-with-shared-conn.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import xk6_couchbase from 'k6/x/couchbase';


const dbConfig = { hostname: 'localhost', username: '<username>', password: '<password>' };
const bucketsToPreWarm = ['test'];
const client = xk6_couchbase.newClientWithSharedConnection(dbConfig, bucketsToPreWarm, "5s");
export default () => {
// syntax :: client.findOne("<db>", "<scope>", "<keyspace>", "<docId>");
var res = client.findOne("test", "_default", "_default", "002wPJwiJArcUpz");
console.log(res);
}
Loading
Loading