From c9f94e10f65832c7a92471f3a4301483e515fd95 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Tue, 15 Oct 2024 12:41:28 -0700 Subject: [PATCH 01/12] refactor connection management --- .gitignore | 1 + couchbase.go | 112 +++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 82 insertions(+), 31 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7199cbf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +xk6-cb diff --git a/couchbase.go b/couchbase.go index 73824bb..e4b484a 100644 --- a/couchbase.go +++ b/couchbase.go @@ -2,6 +2,7 @@ package xk6_couchbase import ( "fmt" + "sync" "time" "github.com/couchbase/gocb/v2" @@ -12,32 +13,86 @@ func init() { k6modules.Register("k6/x/couchbase", new(CouchBase)) } +var ( + singletonClient *Client + errz error + once sync.Once +) + type CouchBase struct{} type Client struct { - client *gocb.Cluster + cluster *gocb.Cluster + + // Key: bucketName (string) + // Value: *gocb.Bucket + bucketsConnections sync.Map + mu sync.Mutex } -func (*CouchBase) NewClient(connectionString, username, password string) interface{} { - // For a secure cluster connection, use `couchbases://` instead. - cluster, err := gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{ - Authenticator: gocb.PasswordAuthenticator{ - Username: username, - Password: password, +func getCouchbaseInstance(connectionString string, username string, password string) (*Client, error) { + once.Do( + func() { + // For a secure cluster connection, use `couchbases://` instead. + cluster, err := gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{ + Authenticator: gocb.PasswordAuthenticator{ + Username: username, + Password: password, + }, + }) + if err != nil { + errz = err + } + singletonClient = &Client{cluster: cluster} }, - }) + ) + return singletonClient, errz +} + +func (*CouchBase) NewClient(connectionString string, username string, password string) interface{} { + client, err := getCouchbaseInstance(connectionString, username, password) if err != nil { - return err + return fmt.Errorf("failed to connect to couchase cluster %s. Err: %w", connectionString, errz) } + return client +} + +// TODO: Create bucket connections on inits and remove mutex. +func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { + bucket, found := c.bucketsConnections.Load(bucketName) + if !found || bucket == nil { + fmt.Printf("bucket %s not found, client instance: %v\n", bucketName, c) + // Create bucket connections + c.mu.Lock() + defer c.mu.Unlock() + bucket, found := c.bucketsConnections.Load(bucketName) + if found && bucket != nil { + return bucket.(*gocb.Bucket), nil + } - return &Client{client: cluster} + newBucket := c.cluster.Bucket(bucketName) + fmt.Printf("bucket %s connected\n", bucketName) + err := newBucket.WaitUntilReady(5*time.Second, nil) + if err != nil { + return nil, fmt.Errorf("failed to wait for bucket %s. Err: %w", bucketName, err) + } + fmt.Printf("bucket %s ready\n", bucketName) + c.bucketsConnections.Store(bucketName, newBucket) + bucket, loaded := c.bucketsConnections.Load(bucketName) + if !loaded { + return nil, fmt.Errorf("failed to load bucket %s", bucketName) + } + fmt.Printf("bucket %s loaded %v\n", bucket.(*gocb.Bucket).Name(), loaded) + return bucket.(*gocb.Bucket), nil + } + return bucket.(*gocb.Bucket), nil } func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) error { - bucket := c.client.Bucket(bucketName) - err := bucket.WaitUntilReady(5*time.Second, nil) + bucket, err := c.getBucket(bucketName) + // bucket := c.client.Bucket(bucketName) if err != nil { - return err + return fmt.Errorf("failed to create bucket connection for insert. Err: %w", err) } col := bucket.Scope(scope).Collection(collection) _, err = col.Insert(docId, doc, nil) @@ -48,10 +103,9 @@ func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) er } func (c *Client) Upsert(bucketName, scope, collection, docId string, doc any) error { - bucket := c.client.Bucket(bucketName) - err := bucket.WaitUntilReady(5*time.Second, nil) + bucket, err := c.getBucket(bucketName) if err != nil { - return err + return fmt.Errorf("failed to create bucket connection for upsert. Err: %w", err) } col := bucket.Scope(scope).Collection(collection) _, err = col.Upsert(docId, doc, nil) @@ -62,12 +116,10 @@ func (c *Client) Upsert(bucketName, scope, collection, docId string, doc any) er } func (c *Client) Remove(bucketName, scope, collection, docId string) error { - bucket := c.client.Bucket(bucketName) - err := bucket.WaitUntilReady(5*time.Second, nil) + bucket, err := c.getBucket(bucketName) if err != nil { - return err + return fmt.Errorf("failed to create bucket connection for remove. Err: %w", err) } - col := bucket.Scope(scope).Collection(collection) // Remove with Durability @@ -82,18 +134,17 @@ func (c *Client) Remove(bucketName, scope, collection, docId string) error { } func (c *Client) InsertBatch(bucketName, scope, collection string, docs map[string]any) error { + bucket, err := c.getBucket(bucketName) + if err != nil { + return fmt.Errorf("failed to create bucket connection for insertBatch. Err: %w", err) + } + batchItems := make([]gocb.BulkOp, len(docs)) index := 0 for k, v := range docs { batchItems[index] = &gocb.InsertOp{ID: k, Value: v} index++ } - - bucket := c.client.Bucket(bucketName) - err := bucket.WaitUntilReady(5*time.Second, nil) - if err != nil { - return err - } col := bucket.Scope(scope).Collection(collection) err = col.Do(batchItems, &gocb.BulkOpOptions{Timeout: 3 * time.Second}) if err != nil { @@ -106,7 +157,7 @@ func (c *Client) InsertBatch(bucketName, scope, collection string, docs map[stri func (c *Client) Find(query string) (any, error) { var result interface{} - queryResult, err := c.client.Query( + queryResult, err := c.cluster.Query( fmt.Sprintf(query), &gocb.QueryOptions{}, ) @@ -127,10 +178,9 @@ func (c *Client) Find(query string) (any, error) { func (c *Client) FindOne(bucketName, scope, collection, docId string) (any, error) { var result interface{} - bucket := c.client.Bucket(bucketName) - err := bucket.WaitUntilReady(5*time.Second, nil) + bucket, err := c.getBucket(bucketName) if err != nil { - return result, err + return result, fmt.Errorf("failed to create bucket connection for findOne. Err: %w", err) } bucketScope := bucket.Scope(scope) @@ -149,7 +199,7 @@ func (c *Client) FindOne(bucketName, scope, collection, docId string) (any, erro func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, error) { var result interface{} - queryResult, err := c.client.Query( + queryResult, err := c.cluster.Query( query, &gocb.QueryOptions{ Adhoc: true, From 04e1b2afbd21a6d5115826a930d1abb5d7eedfa3 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Tue, 15 Oct 2024 13:23:54 -0700 Subject: [PATCH 02/12] refactor connection management - remove unused code --- couchbase.go | 1 - 1 file changed, 1 deletion(-) diff --git a/couchbase.go b/couchbase.go index e4b484a..895830d 100644 --- a/couchbase.go +++ b/couchbase.go @@ -90,7 +90,6 @@ func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) error { bucket, err := c.getBucket(bucketName) - // bucket := c.client.Bucket(bucketName) if err != nil { return fmt.Errorf("failed to create bucket connection for insert. Err: %w", err) } From 2674390d4b2a7f45dacfb0fd702be84b66d1dc38 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Tue, 15 Oct 2024 13:28:48 -0700 Subject: [PATCH 03/12] refactor connection management - do not set client connection when not created --- couchbase.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/couchbase.go b/couchbase.go index 895830d..784d864 100644 --- a/couchbase.go +++ b/couchbase.go @@ -40,9 +40,12 @@ func getCouchbaseInstance(connectionString string, username string, password str Password: password, }, }) + if err != nil { errz = err + return } + singletonClient = &Client{cluster: cluster} }, ) From fa881d53c316a24761df595033a3cef8685b8c9d Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Tue, 15 Oct 2024 13:41:03 -0700 Subject: [PATCH 04/12] refactor connection management - comment out prints --- couchbase.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/couchbase.go b/couchbase.go index 784d864..9d90be0 100644 --- a/couchbase.go +++ b/couchbase.go @@ -64,7 +64,8 @@ func (*CouchBase) NewClient(connectionString string, username string, password s func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { bucket, found := c.bucketsConnections.Load(bucketName) if !found || bucket == nil { - fmt.Printf("bucket %s not found, client instance: %v\n", bucketName, c) + // TODO: Replace printfs with logrus (used in other extensions..) + // fmt.Printf("bucket %s not found, client instance: %v\n", bucketName, c) // Create bucket connections c.mu.Lock() defer c.mu.Unlock() @@ -74,18 +75,18 @@ func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { } newBucket := c.cluster.Bucket(bucketName) - fmt.Printf("bucket %s connected\n", bucketName) + // fmt.Printf("bucket %s connected\n", bucketName) err := newBucket.WaitUntilReady(5*time.Second, nil) if err != nil { return nil, fmt.Errorf("failed to wait for bucket %s. Err: %w", bucketName, err) } - fmt.Printf("bucket %s ready\n", bucketName) + // fmt.Printf("bucket %s ready\n", bucketName) c.bucketsConnections.Store(bucketName, newBucket) bucket, loaded := c.bucketsConnections.Load(bucketName) if !loaded { return nil, fmt.Errorf("failed to load bucket %s", bucketName) } - fmt.Printf("bucket %s loaded %v\n", bucket.(*gocb.Bucket).Name(), loaded) + // fmt.Printf("bucket %s loaded %v\n", bucket.(*gocb.Bucket).Name(), loaded) return bucket.(*gocb.Bucket), nil } return bucket.(*gocb.Bucket), nil From f8fc8e779962d1a12bab9a16807a1c60dc9f571d Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Tue, 15 Oct 2024 17:50:45 -0700 Subject: [PATCH 05/12] refactor connection management - comment out prints --- README.md | 5 +++++ couchbase.go | 61 ++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 52 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index e70c19d..cbe803e 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,11 @@ xk6 build --output xk6-couchbase --with github.com/thotasrinath/xk6-couchbase=. xk6 build --with github.com/thotasrinath/xk6-couchbase@latest ``` +#### Note + +By default, a new connection is created per VU. +To share the connection across multiple queries set the environment variable K6_COUCHBASE_DO_CONN_PER_VU to true + ## Examples ### Document Insertion Test ```js diff --git a/couchbase.go b/couchbase.go index 9d90be0..6ac6850 100644 --- a/couchbase.go +++ b/couchbase.go @@ -1,7 +1,10 @@ package xk6_couchbase import ( + "errors" "fmt" + "os" + "strconv" "sync" "time" @@ -13,6 +16,8 @@ func init() { k6modules.Register("k6/x/couchbase", new(CouchBase)) } +const doConnectionPerVUEnvKey = "K6_COUCHBASE_DO_CONN_PER_VU" + var ( singletonClient *Client errz error @@ -30,22 +35,34 @@ type Client struct { mu sync.Mutex } +func instantiateNewConnection(connectionString string, username string, password string) (*gocb.Cluster, error) { + // For a secure cluster connection, use `couchbases://` instead. + return gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{ + Authenticator: gocb.PasswordAuthenticator{ + Username: username, + Password: password, + }, + }) +} + func getCouchbaseInstance(connectionString string, username string, password string) (*Client, error) { + doConnPerVu := getenvBoolValue(doConnectionPerVUEnvKey, false) + if doConnPerVu { + cluster, err := instantiateNewConnection(connectionString, username, password) + if err != nil { + return nil, fmt.Errorf("failed to create new connection for %s. Err: %w", connectionString, err) + } + client := &Client{cluster: cluster} + return client, nil + } + once.Do( func() { - // For a secure cluster connection, use `couchbases://` instead. - cluster, err := gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{ - Authenticator: gocb.PasswordAuthenticator{ - Username: username, - Password: password, - }, - }) - + cluster, err := instantiateNewConnection(connectionString, username, password) if err != nil { errz = err return } - singletonClient = &Client{cluster: cluster} }, ) @@ -64,8 +81,6 @@ func (*CouchBase) NewClient(connectionString string, username string, password s func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { bucket, found := c.bucketsConnections.Load(bucketName) if !found || bucket == nil { - // TODO: Replace printfs with logrus (used in other extensions..) - // fmt.Printf("bucket %s not found, client instance: %v\n", bucketName, c) // Create bucket connections c.mu.Lock() defer c.mu.Unlock() @@ -75,18 +90,15 @@ func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { } newBucket := c.cluster.Bucket(bucketName) - // fmt.Printf("bucket %s connected\n", bucketName) err := newBucket.WaitUntilReady(5*time.Second, nil) if err != nil { return nil, fmt.Errorf("failed to wait for bucket %s. Err: %w", bucketName, err) } - // fmt.Printf("bucket %s ready\n", bucketName) c.bucketsConnections.Store(bucketName, newBucket) bucket, loaded := c.bucketsConnections.Load(bucketName) if !loaded { return nil, fmt.Errorf("failed to load bucket %s", bucketName) } - // fmt.Printf("bucket %s loaded %v\n", bucket.(*gocb.Bucket).Name(), loaded) return bucket.(*gocb.Bucket), nil } return bucket.(*gocb.Bucket), nil @@ -223,3 +235,24 @@ func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, e return result, nil } + +// TODO: Use gotdotenv or viper pkgs. Done to keep the backward compatibility. +func getenvStringValue(key string) (string, error) { + value := os.Getenv(key) + if value == "" { + return value, errors.New("failed to getenv string value") + } + return value, nil +} + +func getenvBoolValue(key string, defaultValue bool) bool { + s, err := getenvStringValue(key) + if err != nil { + return defaultValue + } + value, err := strconv.ParseBool(s) + if err != nil { + return defaultValue + } + return value +} From df448750d16af21eda70293c9673bf9a127af163 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Tue, 15 Oct 2024 18:10:53 -0700 Subject: [PATCH 06/12] refactor connection management - make timeout env vars --- README.md | 3 ++- couchbase.go | 24 +++++++++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index cbe803e..4f4121c 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,8 @@ xk6 build --with github.com/thotasrinath/xk6-couchbase@latest #### Note By default, a new connection is created per VU. -To share the connection across multiple queries set the environment variable K6_COUCHBASE_DO_CONN_PER_VU to true +To share the connection across multiple queries set the environment variable XK6_COUCHBASE_DO_CONN_PER_VU to true +Bucket WaitUntilReady timeout can be set using XK6_COUCHBASE_BUCKET_READINESS_TIMEOUT=20s ## Examples ### Document Insertion Test diff --git a/couchbase.go b/couchbase.go index 6ac6850..68df0b4 100644 --- a/couchbase.go +++ b/couchbase.go @@ -16,7 +16,11 @@ func init() { k6modules.Register("k6/x/couchbase", new(CouchBase)) } -const doConnectionPerVUEnvKey = "K6_COUCHBASE_DO_CONN_PER_VU" +const ( + // TODO: Define these as API constructs. Const/env variables to have backward compatibility. + doConnectionPerVUEnvKey = "XK6_COUCHBASE_DO_CONN_PER_VU" + bucketReadinessTimeout = "XK6_COUCHBASE_BUCKET_READINESS_TIMEOUT" +) var ( singletonClient *Client @@ -89,8 +93,10 @@ func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { return bucket.(*gocb.Bucket), nil } + // TODO: Add retries. newBucket := c.cluster.Bucket(bucketName) - err := newBucket.WaitUntilReady(5*time.Second, nil) + readinessTimeout := getenvDurationValue(bucketReadinessTimeout, 5*time.Second) + err := newBucket.WaitUntilReady(readinessTimeout, nil) if err != nil { return nil, fmt.Errorf("failed to wait for bucket %s. Err: %w", bucketName, err) } @@ -236,7 +242,7 @@ func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, e return result, nil } -// TODO: Use gotdotenv or viper pkgs. Done to keep the backward compatibility. +// TODO: Use gotdotenv or viper pkgs. func getenvStringValue(key string) (string, error) { value := os.Getenv(key) if value == "" { @@ -245,6 +251,18 @@ func getenvStringValue(key string) (string, error) { return value, nil } +func getenvDurationValue(key string, defaultValue time.Duration) time.Duration { + value, err := getenvStringValue(key) + if err != nil { + return defaultValue + } + duration, err := time.ParseDuration(value) + if err != nil { + return defaultValue + } + return duration +} + func getenvBoolValue(key string, defaultValue bool) bool { s, err := getenvStringValue(key) if err != nil { From 87c5aa370eeacc7c625a7fdf2f0d194ccf38d447 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Tue, 15 Oct 2024 23:56:53 -0700 Subject: [PATCH 07/12] refactor connection management - make client with options --- couchbase.go | 228 +++++++++++++++++++++++++++++++++++---------------- go.sum | 45 ++++++++++ 2 files changed, 202 insertions(+), 71 deletions(-) diff --git a/couchbase.go b/couchbase.go index 68df0b4..2fcc3d0 100644 --- a/couchbase.go +++ b/couchbase.go @@ -1,10 +1,7 @@ package xk6_couchbase import ( - "errors" "fmt" - "os" - "strconv" "sync" "time" @@ -18,20 +15,42 @@ func init() { const ( // TODO: Define these as API constructs. Const/env variables to have backward compatibility. - doConnectionPerVUEnvKey = "XK6_COUCHBASE_DO_CONN_PER_VU" - bucketReadinessTimeout = "XK6_COUCHBASE_BUCKET_READINESS_TIMEOUT" + // doConnectionPerVUEnvKey = "XK6_COUCHBASE_DO_CONN_PER_VU" + // bucketReadinessTimeout = "XK6_COUCHBASE_BUCKET_READINESS_TIMEOUT" + defaultBucketReadinessTimeout = 5 * time.Second + defaultDoConnectionPerVU = false ) var ( singletonClient *Client - errz error once sync.Once ) type CouchBase struct{} +type Options struct { + doConnectionPerVU bool + bucketReadinessTimeout time.Duration + bucketsToWarm []string +} + +func newDefaultOptions() Options { + return Options{ + doConnectionPerVU: defaultDoConnectionPerVU, + bucketReadinessTimeout: defaultBucketReadinessTimeout, + } +} + +type DBConfig struct { + Hostname string `json:"connection_string,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` +} + type Client struct { cluster *gocb.Cluster + errz error + options Options // Key: bucketName (string) // Value: *gocb.Bucket @@ -39,50 +58,77 @@ type Client struct { mu sync.Mutex } -func instantiateNewConnection(connectionString string, username string, password string) (*gocb.Cluster, error) { +func instantiateNewConnection(dbConfig DBConfig, options Options) *Client { // For a secure cluster connection, use `couchbases://` instead. - return gocb.Connect("couchbase://"+connectionString, gocb.ClusterOptions{ + cluster, err := gocb.Connect("couchbase://"+dbConfig.Hostname, gocb.ClusterOptions{ Authenticator: gocb.PasswordAuthenticator{ - Username: username, - Password: password, + Username: dbConfig.Username, + Password: dbConfig.Password, + }, + TimeoutsConfig: gocb.TimeoutsConfig{ + ConnectTimeout: 95 * time.Second, + QueryTimeout: 95 * time.Second, + SearchTimeout: 95 * time.Second, }, }) -} + if err != nil { + return &Client{errz: err, options: options} + } -func getCouchbaseInstance(connectionString string, username string, password string) (*Client, error) { - doConnPerVu := getenvBoolValue(doConnectionPerVUEnvKey, false) - if doConnPerVu { - cluster, err := instantiateNewConnection(connectionString, username, password) + client := &Client{cluster: cluster, options: options} + for _, bucket := range options.bucketsToWarm { + err := client.readyBucket(bucket) if err != nil { - return nil, fmt.Errorf("failed to create new connection for %s. Err: %w", connectionString, err) + return &Client{errz: err, options: options} } - client := &Client{cluster: cluster} - return client, nil + } + return client +} + +func getCouchbaseInstance(dbConfig DBConfig, options Options) *Client { + if options.doConnectionPerVU { + return instantiateNewConnection(dbConfig, options) } once.Do( func() { - cluster, err := instantiateNewConnection(connectionString, username, password) - if err != nil { - errz = err - return - } - singletonClient = &Client{cluster: cluster} + singletonClient = instantiateNewConnection(dbConfig, options) }, ) - return singletonClient, errz + return singletonClient } -func (*CouchBase) NewClient(connectionString string, username string, password string) interface{} { - client, err := getCouchbaseInstance(connectionString, username, password) +func (*CouchBase) NewOptions(doConnectionPerVU bool, bucketReadinessTimeout string, bucketsToWarm []string) interface{} { + + readinessDuration, err := time.ParseDuration(bucketReadinessTimeout) if err != nil { - return fmt.Errorf("failed to connect to couchase cluster %s. Err: %w", connectionString, errz) + return fmt.Errorf("failed to parse readiness timeout %v. Err: %w", bucketReadinessTimeout, err) + } + return Options{ + doConnectionPerVU: doConnectionPerVU, + bucketReadinessTimeout: readinessDuration, + bucketsToWarm: bucketsToWarm, + } +} + +func (*CouchBase) NewWithOptions(dbConfig DBConfig, options Options) *Client { + return getCouchbaseInstance(dbConfig, options) +} + +func (*CouchBase) NewClient(connectionString string, username string, password string) interface{} { + dbConfig := DBConfig{ + Hostname: connectionString, + Username: username, + Password: password, + } + client := getCouchbaseInstance(dbConfig, newDefaultOptions()) + if client.errz != nil { + return fmt.Errorf("failed to connect to couchase cluster %s. Err: %w", connectionString, client.errz) } return client } -// TODO: Create bucket connections on inits and remove mutex. -func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { +func (c *Client) readyBucket(bucketName string) error { bucket, found := c.bucketsConnections.Load(bucketName) if !found || bucket == nil { // Create bucket connections @@ -90,24 +136,64 @@ func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { defer c.mu.Unlock() bucket, found := c.bucketsConnections.Load(bucketName) if found && bucket != nil { - return bucket.(*gocb.Bucket), nil + return nil } // TODO: Add retries. newBucket := c.cluster.Bucket(bucketName) - readinessTimeout := getenvDurationValue(bucketReadinessTimeout, 5*time.Second) - err := newBucket.WaitUntilReady(readinessTimeout, nil) + err := newBucket.WaitUntilReady(c.options.bucketReadinessTimeout, nil) if err != nil { - return nil, fmt.Errorf("failed to wait for bucket %s. Err: %w", bucketName, err) + return fmt.Errorf("failed to wait for bucket %s, timeout: %v. Err: %w", bucketName, c.options.bucketReadinessTimeout, err) } c.bucketsConnections.Store(bucketName, newBucket) - bucket, loaded := c.bucketsConnections.Load(bucketName) - if !loaded { - return nil, fmt.Errorf("failed to load bucket %s", bucketName) - } - return bucket.(*gocb.Bucket), nil + } + return nil +} + +// TODO: Create bucket connections on inits and remove mutex. +func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { + err := c.readyBucket(bucketName) + if err != nil { + return nil, fmt.Errorf("failed to ready bucket %s, Err: %w", bucketName, err) + } + bucket, loaded := c.bucketsConnections.Load(bucketName) + if !loaded { + return nil, fmt.Errorf("failed to load bucket %s", bucketName) } return bucket.(*gocb.Bucket), nil + + // bucket, found := c.bucketsConnections.Load(bucketName) + // if !found || bucket == nil { + // // Create bucket connections + // c.mu.Lock() + // defer c.mu.Unlock() + // bucket, found := c.bucketsConnections.Load(bucketName) + // if found && bucket != nil { + // return bucket.(*gocb.Bucket), nil + // } + + // // TODO: Add retries. + // newBucket := c.cluster.Bucket(bucketName) + // err := newBucket.WaitUntilReady(c.options.bucketReadinessTimeout, nil) + // if err != nil { + // return nil, fmt.Errorf("failed to wait for bucket %s, timeout: %v. Err: %w", bucketName, c.options.bucketReadinessTimeout, err) + // } + // c.bucketsConnections.Store(bucketName, newBucket) + // bucket, loaded := c.bucketsConnections.Load(bucketName) + // if !loaded { + // return nil, fmt.Errorf("failed to load bucket %s", bucketName) + // } + // return bucket.(*gocb.Bucket), nil + // } + // return bucket.(*gocb.Bucket), nil +} + +func (c *Client) HasError() bool { + return c.errz != nil +} + +func (c *Client) GetError() string { + return c.errz.Error() } func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) error { @@ -242,35 +328,35 @@ func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, e return result, nil } -// TODO: Use gotdotenv or viper pkgs. -func getenvStringValue(key string) (string, error) { - value := os.Getenv(key) - if value == "" { - return value, errors.New("failed to getenv string value") - } - return value, nil -} - -func getenvDurationValue(key string, defaultValue time.Duration) time.Duration { - value, err := getenvStringValue(key) - if err != nil { - return defaultValue - } - duration, err := time.ParseDuration(value) - if err != nil { - return defaultValue - } - return duration -} - -func getenvBoolValue(key string, defaultValue bool) bool { - s, err := getenvStringValue(key) - if err != nil { - return defaultValue - } - value, err := strconv.ParseBool(s) - if err != nil { - return defaultValue - } - return value -} +// // TODO: Use gotdotenv or viper pkgs. +// func getenvStringValue(key string) (string, error) { +// value := os.Getenv(key) +// if value == "" { +// return value, errors.New("failed to getenv string value") +// } +// return value, nil +// } + +// func getenvDurationValue(key string, defaultValue time.Duration) time.Duration { +// value, err := getenvStringValue(key) +// if err != nil { +// return defaultValue +// } +// duration, err := time.ParseDuration(value) +// if err != nil { +// return defaultValue +// } +// return duration +// } + +// func getenvBoolValue(key string, defaultValue bool) bool { +// s, err := getenvStringValue(key) +// if err != nil { +// return defaultValue +// } +// value, err := strconv.ParseBool(s) +// if err != nil { +// return defaultValue +// } +// return value +// } diff --git a/go.sum b/go.sum index c6adb12..90dfb63 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,12 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/couchbase/gocb/v2 v2.9.1 h1:yB2ZhRLk782Y9sZlATaUwglZe9+2QpvFmItJXTX4stQ= github.com/couchbase/gocb/v2 v2.9.1/go.mod h1:TMAeK34yUdcASdV4mGcYuwtkAWckRBYN5uvMCEgPfXo= +github.com/couchbase/gocb/v2 v2.9.2 h1:g8VbSuBhY/iKWs6SIOSwoXYdv89D0THTSQQtsSC7J+4= +github.com/couchbase/gocb/v2 v2.9.2/go.mod h1:nCHmyzEiB5FmAzuNNkXtSbXP7wxO7nsYKdQAzm2TvSA= github.com/couchbase/gocbcore/v10 v10.5.1 h1:bwlV/zv/fSQLuO14M9k49K7yWgcWfjSgMyfRGhW1AyU= github.com/couchbase/gocbcore/v10 v10.5.1/go.mod h1:rulbgUK70EuyRUiLQ0LhQAfSI/Rl+jWws8tTbHzvB6M= +github.com/couchbase/gocbcore/v10 v10.5.2 h1:DHK042E1RfhPBR3b14CITl5XHRsLjH3hpERuwUc5UIg= +github.com/couchbase/gocbcore/v10 v10.5.2/go.mod h1:rulbgUK70EuyRUiLQ0LhQAfSI/Rl+jWws8tTbHzvB6M= github.com/couchbase/gocbcoreps v0.1.3 h1:fILaKGCjxFIeCgAUG8FGmRDSpdrRggohOMKEgO9CUpg= github.com/couchbase/gocbcoreps v0.1.3/go.mod h1:hBFpDNPnRno6HH5cRXExhqXYRmTsFJlFHQx7vztcXPk= github.com/couchbase/goprotostellar v1.0.2 h1:yoPbAL9sCtcyZ5e/DcU5PRMOEFaJrF9awXYu3VPfGls= @@ -24,6 +28,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dlclark/regexp2 v1.9.0 h1:pTK/l/3qYIKaRXuHnEnIf7Y5NxfRPfpb7dis6/gdlVI= github.com/dlclark/regexp2 v1.9.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= +github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -67,6 +73,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= @@ -76,6 +84,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDa github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= @@ -120,6 +130,8 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -140,22 +152,40 @@ go.k6.io/k6 v0.53.0 h1:vedyH0gkWp3/roSfgAWhTRk9m5CXia3Is9KuZGKOWYg= go.k6.io/k6 v0.53.0/go.mod h1:6eKR5DkEx8jHLUN2EswaF0qmk9wFtgX/4yvlPdKTEwk= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 h1:dIIDULZJpgdiHz5tXrTgKIMLkus6jEFa7x5SOKcyR7E= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0/go.mod h1:jlRVBe7+Z1wyxFSUs48L6OBQZ5JwH2Hg/Vbl+t9rAgI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0/go.mod h1:CQNu9bj7o7mC6U7+CA/schKEYakYXWr79ucDHTMGhCM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0 h1:nSiV3s7wiCam610XcLbYOmMfJxB9gO4uK3Xgv5gmTgg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0/go.mod h1:hKn/e/Nmd19/x1gvIHwtOwVWM+VhuITSWip3JUDghj0= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 h1:Xw8U6u2f8DK2XAkGRFV7BBLENgnTGX9i4rQRxJf+/vs= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0/go.mod h1:6KW1Fm6R/s6Z3PGXwSJN2K4eT6wQB3vXX6CVnYX9NmM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.29.0 h1:JAv0Jwtl01UFiyWZEMiJZBiTlv5A50zNs8lsthXqIio= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.29.0/go.mod h1:QNKLmUEAq2QUbPQUfvw4fmv0bgbK7UlOSFCnXyfvSNc= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= +go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -169,6 +199,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -189,6 +220,8 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -212,12 +245,18 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -239,8 +278,12 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 h1:RFiFrvy37/mpSpdySBDrUdipW/dHwsRwh3J3+A9VgT4= google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.mod h1:Z5Iiy3jtmioajWHDGFk7CeugTyHtPvMHA4UTmUkyalE= +google.golang.org/genproto/googleapis/api v0.0.0-20240823204242-4ba0660f739c h1:e0zB268kOca6FbuJkYUGxfwG4DKFZG/8DLyv9Zv66cE= +google.golang.org/genproto/googleapis/api v0.0.0-20240823204242-4ba0660f739c/go.mod h1:fO8wJzT2zbQbAjbIoos1285VfEIYKDDY+Dt+WpTkh6g= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c h1:Kqjm4WpoWvwhMPcrAczoTyMySQmYa9Wy2iL6Con4zn8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -248,6 +291,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= +google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= From 236da7343846e937a465e871a8af11ce6d159d65 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Tue, 15 Oct 2024 23:59:33 -0700 Subject: [PATCH 08/12] refactor connection management - remove unused code --- couchbase.go | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/couchbase.go b/couchbase.go index 2fcc3d0..7635031 100644 --- a/couchbase.go +++ b/couchbase.go @@ -327,36 +327,3 @@ func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, e return result, nil } - -// // TODO: Use gotdotenv or viper pkgs. -// func getenvStringValue(key string) (string, error) { -// value := os.Getenv(key) -// if value == "" { -// return value, errors.New("failed to getenv string value") -// } -// return value, nil -// } - -// func getenvDurationValue(key string, defaultValue time.Duration) time.Duration { -// value, err := getenvStringValue(key) -// if err != nil { -// return defaultValue -// } -// duration, err := time.ParseDuration(value) -// if err != nil { -// return defaultValue -// } -// return duration -// } - -// func getenvBoolValue(key string, defaultValue bool) bool { -// s, err := getenvStringValue(key) -// if err != nil { -// return defaultValue -// } -// value, err := strconv.ParseBool(s) -// if err != nil { -// return defaultValue -// } -// return value -// } From 649a390d4dbe2d1edd01836564751b24a5110328 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Wed, 16 Oct 2024 21:40:32 -0700 Subject: [PATCH 09/12] refactor connection management - interface clean-ups --- couchbase.go | 251 ++++++++++++++++++++++++--------------------------- 1 file changed, 116 insertions(+), 135 deletions(-) diff --git a/couchbase.go b/couchbase.go index 7635031..e06822e 100644 --- a/couchbase.go +++ b/couchbase.go @@ -14,43 +14,33 @@ func init() { } const ( - // TODO: Define these as API constructs. Const/env variables to have backward compatibility. - // doConnectionPerVUEnvKey = "XK6_COUCHBASE_DO_CONN_PER_VU" - // bucketReadinessTimeout = "XK6_COUCHBASE_BUCKET_READINESS_TIMEOUT" defaultBucketReadinessTimeout = 5 * time.Second - defaultDoConnectionPerVU = false + defaultDoConnectionPerVU = true ) var ( singletonClient *Client + errz error once sync.Once ) type CouchBase struct{} -type Options struct { - doConnectionPerVU bool - bucketReadinessTimeout time.Duration - bucketsToWarm []string -} - -func newDefaultOptions() Options { - return Options{ - doConnectionPerVU: defaultDoConnectionPerVU, - bucketReadinessTimeout: defaultBucketReadinessTimeout, - } +type options struct { + DoConnectionPerVU bool `json:"do_connection_per_vu,omitempty"` + BucketReadinessTimeout time.Duration `json:"bucket_readiness_timeout,omitempty"` + BucketsToWarm []string `json:"buckets_to_warm,omitempty"` } type DBConfig struct { Hostname string `json:"connection_string,omitempty"` - Username string `json:"username,omitempty"` - Password string `json:"password,omitempty"` + Username string `json:"-"` + Password string `json:"-"` } type Client struct { cluster *gocb.Cluster - errz error - options Options + options options // Key: bucketName (string) // Value: *gocb.Bucket @@ -58,61 +48,39 @@ type Client struct { mu sync.Mutex } -func instantiateNewConnection(dbConfig DBConfig, options Options) *Client { - // For a secure cluster connection, use `couchbases://` instead. - cluster, err := gocb.Connect("couchbase://"+dbConfig.Hostname, gocb.ClusterOptions{ - Authenticator: gocb.PasswordAuthenticator{ - Username: dbConfig.Username, - Password: dbConfig.Password, - }, - TimeoutsConfig: gocb.TimeoutsConfig{ - ConnectTimeout: 95 * time.Second, - QueryTimeout: 95 * time.Second, - SearchTimeout: 95 * time.Second, - }, - }) - if err != nil { - return &Client{errz: err, options: options} +func (c *CouchBase) NewClientPerVU(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string) (*Client, error) { + opts := options{ + DoConnectionPerVU: true, + BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration), + BucketsToWarm: bucketsToWarm, } - - client := &Client{cluster: cluster, options: options} - for _, bucket := range options.bucketsToWarm { - err := client.readyBucket(bucket) - if err != nil { - return &Client{errz: err, options: options} - } - } - return client + return c.NewWithOptions(dbConfig, opts) } -func getCouchbaseInstance(dbConfig DBConfig, options Options) *Client { - if options.doConnectionPerVU { - return instantiateNewConnection(dbConfig, options) +func (c *CouchBase) NewClientWithSharedConnection(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string) (*Client, error) { + opts := options{ + DoConnectionPerVU: false, + BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration), + BucketsToWarm: bucketsToWarm, } - once.Do( - func() { - singletonClient = instantiateNewConnection(dbConfig, options) - }, - ) - return singletonClient + return c.NewWithOptions(dbConfig, opts) } -func (*CouchBase) NewOptions(doConnectionPerVU bool, bucketReadinessTimeout string, bucketsToWarm []string) interface{} { - - readinessDuration, err := time.ParseDuration(bucketReadinessTimeout) +func (c *CouchBase) NewWithOptions(dbConfig DBConfig, opts options) (*Client, error) { + client, err := getCouchbaseInstance(dbConfig, opts) if err != nil { - return fmt.Errorf("failed to parse readiness timeout %v. Err: %w", bucketReadinessTimeout, err) - } - return Options{ - doConnectionPerVU: doConnectionPerVU, - bucketReadinessTimeout: readinessDuration, - bucketsToWarm: bucketsToWarm, + return nil, fmt.Errorf("failed to create new couchbase connection with options for cluster %s. Err: %w", dbConfig.Hostname, err) } -} -func (*CouchBase) NewWithOptions(dbConfig DBConfig, options Options) *Client { - return getCouchbaseInstance(dbConfig, options) + // Optionally warm the bucket on client's request + for _, bucket := range opts.BucketsToWarm { + _, err := client.connectBucketOrLoad(bucket) + if err != nil { + return nil, fmt.Errorf("failed to connect to bucket :%s, Err: %w", bucket, err) + } + } + return client, nil } func (*CouchBase) NewClient(connectionString string, username string, password string) interface{} { @@ -121,79 +89,15 @@ func (*CouchBase) NewClient(connectionString string, username string, password s Username: username, Password: password, } - client := getCouchbaseInstance(dbConfig, newDefaultOptions()) - if client.errz != nil { - return fmt.Errorf("failed to connect to couchase cluster %s. Err: %w", connectionString, client.errz) + opts := options{ + DoConnectionPerVU: defaultDoConnectionPerVU, + BucketReadinessTimeout: defaultBucketReadinessTimeout, } - return client -} - -func (c *Client) readyBucket(bucketName string) error { - bucket, found := c.bucketsConnections.Load(bucketName) - if !found || bucket == nil { - // Create bucket connections - c.mu.Lock() - defer c.mu.Unlock() - bucket, found := c.bucketsConnections.Load(bucketName) - if found && bucket != nil { - return nil - } - - // TODO: Add retries. - newBucket := c.cluster.Bucket(bucketName) - err := newBucket.WaitUntilReady(c.options.bucketReadinessTimeout, nil) - if err != nil { - return fmt.Errorf("failed to wait for bucket %s, timeout: %v. Err: %w", bucketName, c.options.bucketReadinessTimeout, err) - } - c.bucketsConnections.Store(bucketName, newBucket) - } - return nil -} - -// TODO: Create bucket connections on inits and remove mutex. -func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { - err := c.readyBucket(bucketName) + client, err := getCouchbaseInstance(dbConfig, opts) if err != nil { - return nil, fmt.Errorf("failed to ready bucket %s, Err: %w", bucketName, err) - } - bucket, loaded := c.bucketsConnections.Load(bucketName) - if !loaded { - return nil, fmt.Errorf("failed to load bucket %s", bucketName) + return fmt.Errorf("failed to connect to couchase cluster %s. Err: %w", connectionString, err) } - return bucket.(*gocb.Bucket), nil - - // bucket, found := c.bucketsConnections.Load(bucketName) - // if !found || bucket == nil { - // // Create bucket connections - // c.mu.Lock() - // defer c.mu.Unlock() - // bucket, found := c.bucketsConnections.Load(bucketName) - // if found && bucket != nil { - // return bucket.(*gocb.Bucket), nil - // } - - // // TODO: Add retries. - // newBucket := c.cluster.Bucket(bucketName) - // err := newBucket.WaitUntilReady(c.options.bucketReadinessTimeout, nil) - // if err != nil { - // return nil, fmt.Errorf("failed to wait for bucket %s, timeout: %v. Err: %w", bucketName, c.options.bucketReadinessTimeout, err) - // } - // c.bucketsConnections.Store(bucketName, newBucket) - // bucket, loaded := c.bucketsConnections.Load(bucketName) - // if !loaded { - // return nil, fmt.Errorf("failed to load bucket %s", bucketName) - // } - // return bucket.(*gocb.Bucket), nil - // } - // return bucket.(*gocb.Bucket), nil -} - -func (c *Client) HasError() bool { - return c.errz != nil -} - -func (c *Client) GetError() string { - return c.errz.Error() + return client } func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) error { @@ -287,7 +191,7 @@ func (c *Client) FindOne(bucketName, scope, collection, docId string) (any, erro var result interface{} bucket, err := c.getBucket(bucketName) if err != nil { - return result, fmt.Errorf("failed to create bucket connection for findOne. Err: %w", err) + return result, fmt.Errorf("failed to get bucket connection for findOne. Err: %w", err) } bucketScope := bucket.Scope(scope) @@ -327,3 +231,80 @@ func (c *Client) FindByPreparedStmt(query string, params ...interface{}) (any, e return result, nil } + +func (c *Client) Close() error { + opts := gocb.ClusterCloseOptions{} + return c.cluster.Close(&opts) +} + +// TODO: Create bucket connections on inits and remove mutex. +func (c *Client) getBucket(bucketName string) (*gocb.Bucket, error) { + return c.connectBucketOrLoad(bucketName) +} + +func (c *Client) connectBucketOrLoad(bucketName string) (*gocb.Bucket, error) { + bucket, found := c.bucketsConnections.Load(bucketName) + if !found || bucket == nil { + // Create bucket connections + // Mutex Lock to ensure that the bucket is instantiated only once in shared cluster connection mode. + c.mu.Lock() + defer c.mu.Unlock() + bucket, found := c.bucketsConnections.Load(bucketName) + if found && bucket != nil { + return bucket.(*gocb.Bucket), nil + } + + newBucket := c.cluster.Bucket(bucketName) + err := newBucket.WaitUntilReady(c.options.BucketReadinessTimeout, nil) + if err != nil { + return nil, fmt.Errorf("failed to wait for bucket %s, timeout: %v. Err: %w", bucketName, c.options.BucketReadinessTimeout, err) + } + c.bucketsConnections.Store(bucketName, newBucket) + return newBucket, nil + + } + return bucket.(*gocb.Bucket), nil +} + +func getCouchbaseInstance(dbConfig DBConfig, opts options) (*Client, error) { + if opts.DoConnectionPerVU { + return instantiateNewConnection(dbConfig, opts) + } + + once.Do( + func() { + client, err := instantiateNewConnection(dbConfig, opts) + if err != nil { + errz = err + return + } + singletonClient = client + }, + ) + return singletonClient, errz +} + +func instantiateNewConnection(dbConfig DBConfig, options options) (*Client, error) { + // For a secure cluster connection, use `couchbases://` instead. + cluster, err := gocb.Connect("couchbase://"+dbConfig.Hostname, gocb.ClusterOptions{ + Authenticator: gocb.PasswordAuthenticator{ + Username: dbConfig.Username, + Password: dbConfig.Password, + }, + // TODO: Set timeoutConfig + }) + if err != nil { + return nil, fmt.Errorf("faile to instantiate new connection to couchbase cluster %s. Err: %w", dbConfig.Hostname, err) + } + + return &Client{cluster: cluster, options: options}, nil +} + +func parseStringToDuration(bucketReadinessDuration string) time.Duration { + readinessDuration, err := time.ParseDuration(bucketReadinessDuration) + if err != nil { + readinessDuration = defaultBucketReadinessTimeout + } + + return readinessDuration +} From be9a683d71f28e1db78f7824b8ca2e88539d0963 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Thu, 17 Oct 2024 09:28:36 -0700 Subject: [PATCH 10/12] refactor connection management - add read mes --- README.md | 6 ------ couchbase.go | 17 ++++++++--------- examples/test-new-with-conn-per-vu.js | 11 +++++++++++ examples/test-new-with-shared-conn.js | 11 +++++++++++ 4 files changed, 30 insertions(+), 15 deletions(-) create mode 100644 examples/test-new-with-conn-per-vu.js create mode 100644 examples/test-new-with-shared-conn.js diff --git a/README.md b/README.md index 4f4121c..e70c19d 100644 --- a/README.md +++ b/README.md @@ -26,12 +26,6 @@ xk6 build --output xk6-couchbase --with github.com/thotasrinath/xk6-couchbase=. xk6 build --with github.com/thotasrinath/xk6-couchbase@latest ``` -#### Note - -By default, a new connection is created per VU. -To share the connection across multiple queries set the environment variable XK6_COUCHBASE_DO_CONN_PER_VU to true -Bucket WaitUntilReady timeout can be set using XK6_COUCHBASE_BUCKET_READINESS_TIMEOUT=20s - ## Examples ### Document Insertion Test ```js diff --git a/couchbase.go b/couchbase.go index e06822e..dd64399 100644 --- a/couchbase.go +++ b/couchbase.go @@ -54,7 +54,7 @@ func (c *CouchBase) NewClientPerVU(dbConfig DBConfig, bucketsToWarm []string, bu BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration), BucketsToWarm: bucketsToWarm, } - return c.NewWithOptions(dbConfig, opts) + return c.NewClientWithOptions(dbConfig, opts) } func (c *CouchBase) NewClientWithSharedConnection(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string) (*Client, error) { @@ -64,10 +64,10 @@ func (c *CouchBase) NewClientWithSharedConnection(dbConfig DBConfig, bucketsToWa BucketsToWarm: bucketsToWarm, } - return c.NewWithOptions(dbConfig, opts) + return c.NewClientWithOptions(dbConfig, opts) } -func (c *CouchBase) NewWithOptions(dbConfig DBConfig, opts options) (*Client, error) { +func (c *CouchBase) NewClientWithOptions(dbConfig DBConfig, opts options) (*Client, error) { client, err := getCouchbaseInstance(dbConfig, opts) if err != nil { return nil, fmt.Errorf("failed to create new couchbase connection with options for cluster %s. Err: %w", dbConfig.Hostname, err) @@ -100,7 +100,7 @@ func (*CouchBase) NewClient(connectionString string, username string, password s return client } -func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) error { +func (c *Client) Insert(bucketName string, scope string, collection string, docId string, doc any) error { bucket, err := c.getBucket(bucketName) if err != nil { return fmt.Errorf("failed to create bucket connection for insert. Err: %w", err) @@ -113,7 +113,7 @@ func (c *Client) Insert(bucketName, scope, collection, docId string, doc any) er return nil } -func (c *Client) Upsert(bucketName, scope, collection, docId string, doc any) error { +func (c *Client) Upsert(bucketName string, scope string, collection string, docId string, doc any) error { bucket, err := c.getBucket(bucketName) if err != nil { return fmt.Errorf("failed to create bucket connection for upsert. Err: %w", err) @@ -126,7 +126,7 @@ func (c *Client) Upsert(bucketName, scope, collection, docId string, doc any) er return nil } -func (c *Client) Remove(bucketName, scope, collection, docId string) error { +func (c *Client) Remove(bucketName string, scope string, collection string, docId string) error { bucket, err := c.getBucket(bucketName) if err != nil { return fmt.Errorf("failed to create bucket connection for remove. Err: %w", err) @@ -144,7 +144,7 @@ func (c *Client) Remove(bucketName, scope, collection, docId string) error { return nil } -func (c *Client) InsertBatch(bucketName, scope, collection string, docs map[string]any) error { +func (c *Client) InsertBatch(bucketName string, scope string, collection string, docs map[string]any) error { bucket, err := c.getBucket(bucketName) if err != nil { return fmt.Errorf("failed to create bucket connection for insertBatch. Err: %w", err) @@ -187,14 +187,13 @@ func (c *Client) Find(query string) (any, error) { return result, nil } -func (c *Client) FindOne(bucketName, scope, collection, docId string) (any, error) { +func (c *Client) FindOne(bucketName string, scope string, collection string, docId string) (any, error) { var result interface{} bucket, err := c.getBucket(bucketName) if err != nil { return result, fmt.Errorf("failed to get bucket connection for findOne. Err: %w", err) } bucketScope := bucket.Scope(scope) - getResult, err := bucketScope.Collection(collection).Get(docId, nil) if err != nil { return result, err diff --git a/examples/test-new-with-conn-per-vu.js b/examples/test-new-with-conn-per-vu.js new file mode 100644 index 0000000..93e38bf --- /dev/null +++ b/examples/test-new-with-conn-per-vu.js @@ -0,0 +1,11 @@ +import xk6_couchbase from 'k6/x/couchbase'; + + +const dbConfig = { hostname: 'localhost', username: '', password: '' }; +const bucketsToPreWarm = ['test']; +const client = xk6_couchbase.newClientPerVU(dbConfig, bucketsToPreWarm, "5s"); +export default () => { + // syntax :: client.findOne("", "", "", ""); + var res = client.findOne("test", "_default", "_default", "002wPJwiJArcUpz"); + console.log(res); +} diff --git a/examples/test-new-with-shared-conn.js b/examples/test-new-with-shared-conn.js new file mode 100644 index 0000000..d5185b6 --- /dev/null +++ b/examples/test-new-with-shared-conn.js @@ -0,0 +1,11 @@ +import xk6_couchbase from 'k6/x/couchbase'; + + +const dbConfig = { hostname: 'localhost', username: '', password: '' }; +const bucketsToPreWarm = ['test']; +const client = xk6_couchbase.newClientWithSharedConnection(dbConfig, bucketsToPreWarm, "5s"); +export default () => { + // syntax :: client.findOne("", "", "", ""); + var res = client.findOne("test", "_default", "_default", "002wPJwiJArcUpz"); + console.log(res); +} From ff044202ad0d9e074692238786455d20dcdad779 Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Fri, 18 Oct 2024 16:21:45 -0700 Subject: [PATCH 11/12] refactor connection management - add exists func --- couchbase.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/couchbase.go b/couchbase.go index dd64399..cf69f66 100644 --- a/couchbase.go +++ b/couchbase.go @@ -187,6 +187,19 @@ func (c *Client) Find(query string) (any, error) { return result, nil } +func (c *Client) Exists(bucketName string, scope string, collection string, docId string) error { + bucket, err := c.getBucket(bucketName) + if err != nil { + return fmt.Errorf("failed to get bucket connection for findOne. Err: %w", err) + } + bucketScope := bucket.Scope(scope) + _, err = bucketScope.Collection(collection).Exists(docId, nil) + if err != nil { + return err + } + return nil +} + func (c *Client) FindOne(bucketName string, scope string, collection string, docId string) (any, error) { var result interface{} bucket, err := c.getBucket(bucketName) From 068f948a0fa7c0977b8292b14b85446e8deb992d Mon Sep 17 00:00:00 2001 From: Sharath Jagannath Date: Mon, 21 Oct 2024 09:14:05 -0700 Subject: [PATCH 12/12] refactor connection management - add connection buffer size --- couchbase.go | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/couchbase.go b/couchbase.go index cf69f66..1d2d43a 100644 --- a/couchbase.go +++ b/couchbase.go @@ -14,8 +14,9 @@ func init() { } const ( - defaultBucketReadinessTimeout = 5 * time.Second - defaultDoConnectionPerVU = true + defaultBucketReadinessTimeout = 5 * time.Second + defaultDoConnectionPerVU = true + defaultConnectionBufferSizeBytes = 2048 ) var ( @@ -27,9 +28,10 @@ var ( type CouchBase struct{} type options struct { - DoConnectionPerVU bool `json:"do_connection_per_vu,omitempty"` - BucketReadinessTimeout time.Duration `json:"bucket_readiness_timeout,omitempty"` - BucketsToWarm []string `json:"buckets_to_warm,omitempty"` + DoConnectionPerVU bool `json:"do_connection_per_vu,omitempty"` + BucketReadinessTimeout time.Duration `json:"bucket_readiness_timeout,omitempty"` + BucketsToWarm []string `json:"buckets_to_warm,omitempty"` + ConnectionBufferSizeBytes int `json:"connection_buffer_size_bytes,omitempty"` } type DBConfig struct { @@ -48,26 +50,31 @@ type Client struct { mu sync.Mutex } -func (c *CouchBase) NewClientPerVU(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string) (*Client, error) { +func (c *CouchBase) NewClientPerVU(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string, connectionBufferSizeBytes int) (*Client, error) { opts := options{ - DoConnectionPerVU: true, - BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration), - BucketsToWarm: bucketsToWarm, + DoConnectionPerVU: true, + BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration), + BucketsToWarm: bucketsToWarm, + ConnectionBufferSizeBytes: connectionBufferSizeBytes, } return c.NewClientWithOptions(dbConfig, opts) } -func (c *CouchBase) NewClientWithSharedConnection(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string) (*Client, error) { +func (c *CouchBase) NewClientWithSharedConnection(dbConfig DBConfig, bucketsToWarm []string, bucketReadinessDuration string, connectionBufferSizeBytes int) (*Client, error) { opts := options{ - DoConnectionPerVU: false, - BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration), - BucketsToWarm: bucketsToWarm, + DoConnectionPerVU: false, + BucketReadinessTimeout: parseStringToDuration(bucketReadinessDuration), + BucketsToWarm: bucketsToWarm, + ConnectionBufferSizeBytes: connectionBufferSizeBytes, } return c.NewClientWithOptions(dbConfig, opts) } func (c *CouchBase) NewClientWithOptions(dbConfig DBConfig, opts options) (*Client, error) { + if opts.ConnectionBufferSizeBytes < 1 { + opts.ConnectionBufferSizeBytes = defaultConnectionBufferSizeBytes + } client, err := getCouchbaseInstance(dbConfig, opts) if err != nil { return nil, fmt.Errorf("failed to create new couchbase connection with options for cluster %s. Err: %w", dbConfig.Hostname, err) @@ -298,7 +305,9 @@ func getCouchbaseInstance(dbConfig DBConfig, opts options) (*Client, error) { func instantiateNewConnection(dbConfig DBConfig, options options) (*Client, error) { // For a secure cluster connection, use `couchbases://` instead. - cluster, err := gocb.Connect("couchbase://"+dbConfig.Hostname, gocb.ClusterOptions{ + connStr := fmt.Sprintf("couchbase://%s?kv_buffer_size=2048", dbConfig.Hostname) + // connStr := "couchbase://"+dbConfig.Hostname + cluster, err := gocb.Connect(connStr, gocb.ClusterOptions{ Authenticator: gocb.PasswordAuthenticator{ Username: dbConfig.Username, Password: dbConfig.Password,