Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add to support single table with flexible support for multiple partition keys. #181

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ err = db.Exec("DELETE FROM orders WHERE product_id = 3").Error
fmt.Println(err) // ErrMissingShardingKey
```

The example demonstrating a single table supporting multiple partitioning strategies is(单表支持多种分表策略的例子在这里)[here](./test/sharding_test.go).

The full example is [here](./examples/order.go).

> 🚨 NOTE: Gorm config `PrepareStmt: true` is not supported for now.
Expand Down
21 changes: 15 additions & 6 deletions conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,26 @@ func (pool ConnPool) ExecContext(ctx context.Context, query string, args ...any)
curTime = time.Now()
)

ftQuery, stQuery, table, err := pool.sharding.resolve(query, args...)
ftQuery, stQuery, table, err := pool.sharding.resolve(ctx, query, args...)
if err != nil {
return nil, err
}

pool.sharding.querys.Store("last_query", stQuery)

if table != "" {
if r, ok := pool.sharding.configs[table]; ok {
key := table
key, err = pool.sharding.getConfigKey(ctx, table)
if err != nil {
return nil, err
}
if r, ok := pool.sharding.configs[key]; ok {
Copy link
Collaborator

@huacnlee huacnlee Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来这个思路是可行,如果能完善,我觉得我们可以引入这个改进。

sharding_key%s_%v 这种在实际应用的时候需要掌握这套规则,不是一个完整的 API 设计。

例如 sharding_key 应该是一个 const,而 key 的我没想到好的解决方法,目前这样,以后其他人使用会存在各种各样的问题,他们很容易遇到无法命中配置。

如果是 sharding_key 已经传了,但 sharding.configs 又找不到对应配置的情况下,应该直接报错。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有道理,这两点我完善一下~

if r.DoubleWrite {
pool.sharding.Logger.Trace(ctx, curTime, func() (sql string, rowsAffected int64) {
result, _ := pool.ConnPool.ExecContext(ctx, ftQuery, args...)
rowsAffected, _ = result.RowsAffected()
if result != nil {
rowsAffected, _ = result.RowsAffected()
}
return pool.sharding.Explain(ftQuery, args...), rowsAffected
}, pool.sharding.Error)
}
Expand All @@ -50,7 +57,9 @@ func (pool ConnPool) ExecContext(ctx context.Context, query string, args ...any)
var result sql.Result
result, err = pool.ConnPool.ExecContext(ctx, stQuery, args...)
pool.sharding.Logger.Trace(ctx, curTime, func() (sql string, rowsAffected int64) {
rowsAffected, _ = result.RowsAffected()
if result != nil {
rowsAffected, _ = result.RowsAffected()
}
return pool.sharding.Explain(stQuery, args...), rowsAffected
}, pool.sharding.Error)

Expand All @@ -63,7 +72,7 @@ func (pool ConnPool) QueryContext(ctx context.Context, query string, args ...any
curTime = time.Now()
)

_, stQuery, _, err := pool.sharding.resolve(query, args...)
_, stQuery, _, err := pool.sharding.resolve(ctx, query, args...)
if err != nil {
return nil, err
}
Expand All @@ -80,7 +89,7 @@ func (pool ConnPool) QueryContext(ctx context.Context, query string, args ...any
}

func (pool ConnPool) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row {
_, query, _, _ = pool.sharding.resolve(query, args...)
_, query, _, _ = pool.sharding.resolve(ctx, query, args...)
pool.sharding.querys.Store("last_query", query)

return pool.ConnPool.QueryRowContext(ctx, query, args...)
Expand Down
75 changes: 70 additions & 5 deletions sharding.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sharding

import (
"context"
"errors"
"fmt"
"hash/crc32"
Expand All @@ -15,15 +16,20 @@ import (
)

var (
ErrMissingShardingKey = errors.New("sharding key or id required, and use operator =")
ErrInvalidID = errors.New("invalid id format")
ErrInsertDiffSuffix = errors.New("can not insert different suffix table in one query ")
ErrMissingShardingKey = errors.New("sharding key or id required, and use operator =")
ErrInvalidID = errors.New("invalid id format")
ErrInsertDiffSuffix = errors.New("can not insert different suffix table in one query ")
ErrShardingKeyNotExistInContext = errors.New("the value passed in the context is not the sharding key")
ErrMissingTableName = errors.New("table name is required")
)

var (
ShardingIgnoreStoreKey = "sharding_ignore"
)

// ContextKeyForShardingKey is the context key for sharding key.
const ContextKeyForShardingKey = "sharding_key"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改成 ShardingContextKey 与上面的对应,另外把上面那个也改成 const 合并在一起

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的~


type Sharding struct {
*gorm.DB
ConnPool *ConnPool
Expand All @@ -46,6 +52,10 @@ type Config struct {
// For example, for a product order table, you may want to split the rows by `user_id`.
ShardingKey string

// logical table name.Suport multiple table names with same sharding key.
// For example, for user and order table, you may want to shard by `user_id`.
TableNames []string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是必须要的吗?我看上一次都没有这个,这样会增加 config 理解的复杂度,容易产生使用上的歧义。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我准备要合并这个功能,但你得确保尽量少的改动 API,非必要的时候不要引入新的 API 设计。

TableNames 这个我感觉无法接受,这会让整个 Sharding 对于 table 的组织配置变得复杂。最初的设计进需要考虑 sharding_key,而 table 本身是由 Gorm 的 table_name 映射机制来的(这个实际上最初来自 ActiveRecord 的 Model 和 Table 的约定推导方式)。

因此这里不应该存在特别配置 TableNames 的场景,TableNames 始终应该从 Gorm 里面来,如果要自定义,也应当用 Gorm 的 TableName 来实现,而不是在 Sharding 里面处理。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
image
那我把方法暴露出去,注册的逻辑还是保持传map的形式您看这样是否ok


// NumberOfShards specifies how many tables you want to sharding.
NumberOfShards uint

Expand Down Expand Up @@ -110,6 +120,56 @@ func Register(config Config, tables ...any) *Sharding {
}
}

// enables sharding for a single table with flexible support for multiple partition keys.
func RegisterWithKeys(configs []Config) (*Sharding, error) {
mapConfig := make(map[string]Config, len(configs))
for _, config := range configs {
for _, tableName := range config.TableNames {
configKey, err := generateConfigsKey(tableName, config.ShardingKey)
if err != nil {
return nil, err
}
mapConfig[configKey] = config
}
}
return &Sharding{
configs: mapConfig,
}, nil
}

// generates the key for the sharding config.
func generateConfigsKey(tableName, shardingKey string) (string, error) {
// Table name cannot be empty
if tableName == "" {
return "", ErrMissingTableName
}
if shardingKey == "" {
return "", ErrMissingShardingKey
}
return fmt.Sprintf("%s_%s", tableName, shardingKey), nil
}

// get the configs key for using it to get the sharding config.
func (s *Sharding) getConfigKey(ctx context.Context, tableName string) (string, error) {
configKey := tableName
if shardingKey, ok := ctx.Value(ContextKeyForShardingKey).(string); ok {
// If sharding key is set in context, use it to get the sharding config.
configKey = fmt.Sprintf("%s_%s", tableName, shardingKey)
} else {
// If sharding key is not set in context, use the table name as the key.
return configKey, nil
}

// check if the sharding key exists in the configs.
_, exis := s.configs[configKey]
if !exis {
return "", ErrShardingKeyNotExistInContext
}

// If sharding key is not set in context, use the table name as the key.
return configKey, nil
}

func (s *Sharding) compile() error {
if s.configs == nil {
s.configs = make(map[string]Config)
Expand Down Expand Up @@ -297,7 +357,7 @@ func (s *Sharding) switchConn(db *gorm.DB) {
}

// resolve split the old query to full table query and sharding table query
func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) {
func (s *Sharding) resolve(ctx context.Context, query string, args ...any) (ftQuery, stQuery, tableName string, err error) {
ftQuery = query
stQuery = query
if len(s.configs) == 0 {
Expand Down Expand Up @@ -344,7 +404,12 @@ func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableNa
}

tableName = table.Name.Name
r, ok := s.configs[tableName]
key := tableName
key, err = s.getConfigKey(ctx, tableName)
if err != nil {
return
}
r, ok := s.configs[key]
if !ok {
return
}
Expand Down
Loading
Loading