Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

partially resolve #100 (bucket_stat) #113

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ REFACTOR:
* Use constants for vshard error names and codes.
* Reduce SLOC by using CallAsync method.
* BucketForceCreate optimization: don't decode tnt response.
* Remove bucketStatError type, use StorageCallVShardError type instead.
* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100).
* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content).

TESTS:
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.
Expand Down
4 changes: 2 additions & 2 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl

for _, rsFuture := range rsFutures {
if _, err := bucketStatWait(rsFuture.future); err != nil {
var bsError bucketStatError
if !errors.As(err, &bsError) {
var vshardError StorageCallVShardError
if !errors.As(err, &vshardError) {
r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsID, err)
}
// just skip, bucket may not belong to this replicaset
Expand Down
14 changes: 0 additions & 14 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,6 @@ const (
VShardErrNameInstanceNameMismatch = "INSTANCE_NAME_MISMATCH"
)

type bucketStatError struct {
BucketID uint64 `msgpack:"bucket_id"`
Reason string `msgpack:"reason"`
Code int `msgpack:"code"`
Type string `msgpack:"type"`
Message string `msgpack:"message"`
Name string `msgpack:"name"`
}

func (bse bucketStatError) Error() string {
type alias bucketStatError
return fmt.Sprintf("%+v", alias(bse))
}

func newVShardErrorNoRouteToBucket(bucketID uint64) error {
return &StorageCallVShardError{
Name: VShardErrNameNoRouteToBucket,
Expand Down
71 changes: 51 additions & 20 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"time"

"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
"github.com/vmihailenco/msgpack/v5"
"github.com/vmihailenco/msgpack/v5/msgpcode"
)

type ReplicasetInfo struct {
Expand Down Expand Up @@ -51,41 +52,71 @@ func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tar
return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RO}, bucketStatFnc, []interface{}{bucketID})
}

func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
var bsInfo BucketStatInfo
type vshardStorageBucketStatResponseProto struct {
ok bool
info BucketStatInfo
err StorageCallVShardError
}

respData, err := future.Get()
func (r *vshardStorageBucketStatResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
// bucket_stat returns pair: stat, err
// https://github.com/tarantool/vshard/blob/e1c806e1d3d2ce8a4e6b4d498c09051bf34ab92a/vshard/storage/init.lua#L1413

respArrayLen, err := d.DecodeArrayLen()
if err != nil {
return bsInfo, err
return err
}

if len(respData) == 0 {
return bsInfo, fmt.Errorf("protocol violation bucketStatWait: empty response")
if respArrayLen == 0 {
return fmt.Errorf("protocol violation bucketStatWait: empty response")
}

if respData[0] == nil {
if len(respData) != 2 {
return bsInfo, fmt.Errorf("protocol violation bucketStatWait: invalid response length %d when respData[0] is nil", len(respData))
code, err := d.PeekCode()
if err != nil {
return err
}

if code == msgpcode.Nil {
err = d.DecodeNil()
if err != nil {
return err
}

if respArrayLen != 2 {
return fmt.Errorf("protocol violation bucketStatWait: length is %d on vshard error case", respArrayLen)
}

var bsError bucketStatError
err = mapstructure.Decode(respData[1], &bsError)
err = d.Decode(&r.err)
if err != nil {
// We could not decode respData[1] as bsError, so return respData[1] as is, add info why we could not decode.
return bsInfo, fmt.Errorf("bucketStatWait error: %v (can't decode into bsError: %v)", respData[1], err)
return fmt.Errorf("failed to decode storage vshard error: %w", err)
}

return bsInfo, bsError
return nil
}

// A problem with key-code 1
// todo: fix after https://github.com/tarantool/go-tarantool/issues/368
err = mapstructure.Decode(respData[0], &bsInfo)
err = d.Decode(&r.info)
if err != nil {
return bsInfo, fmt.Errorf("can't decode bsInfo: %w", err)
return fmt.Errorf("failed to decode bucket stat info: %w", err)
}

r.ok = true

return nil
}

func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
var bucketStatResponse vshardStorageBucketStatResponseProto

err := future.GetTyped(&bucketStatResponse)
if err != nil {
return BucketStatInfo{}, err
}

if !bucketStatResponse.ok {
return BucketStatInfo{}, bucketStatResponse.err
}

return bsInfo, nil
return bucketStatResponse.info, nil
}

// ReplicaCall perform function on remote storage
Expand Down
45 changes: 43 additions & 2 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/google/uuid"
"github.com/snksoft/crc"
"github.com/vmihailenco/msgpack/v5"

tarantool "github.com/tarantool/go-tarantool/v2"
)
Expand Down Expand Up @@ -122,8 +123,48 @@ type Config struct {
}

type BucketStatInfo struct {
BucketID uint64 `mapstructure:"id"`
Status string `mapstructure:"status"`
BucketID uint64 `msgpack:"id"`
Status string `msgpack:"status"`
}

// tnt vshard storage returns map with 'int' keys for bucketStatInfo,
// example: map[id:48 status:active 1:48 2:active].
// But msgpackv5 supports only string keys when decoding maps into structs,
// see issue: https://github.com/vmihailenco/msgpack/issues/372
// To workaround this we decode BucketStatInfo manually.
// When the issue above will be resolved, this code can be (and should be) deleted.
func (bsi *BucketStatInfo) DecodeMsgpack(d *msgpack.Decoder) error {
nKeys, err := d.DecodeMapLen()
if err != nil {
return err
}

for i := 0; i < nKeys; i++ {
key, err := d.DecodeInterface()
if err != nil {
return err
}

keyName, _ := key.(string)
switch keyName {
case "id":
if err := d.Decode(&bsi.BucketID); err != nil {
return err
}
case "status":
if err := d.Decode(&bsi.Status); err != nil {
return err
}
default:
// skip unused value
if err := d.Skip(); err != nil {
return err
}
}

}

return nil
}

type InstanceInfo struct {
Expand Down
Loading