Skip to content

Commit

Permalink
fix: replace hashing select field qulify table name
Browse files Browse the repository at this point in the history
  • Loading branch information
Gowa2017 committed Oct 29, 2024
1 parent 04c39bd commit e436477
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,12 @@ func (s *Sharding) switchConn(db *gorm.DB) {
s.mutex.Unlock()
}
}
func replaceConditionTableName(key, tableName string, expr sqlparser.Expr) error {
func replaceConditionTableName(oldTableName, tableName string, expr sqlparser.Expr) error {

err := sqlparser.Walk(sqlparser.VisitFunc(func(node sqlparser.Node) error {
if n, ok := node.(*sqlparser.BinaryExpr); ok {
if q, ok2 := n.X.(*sqlparser.QualifiedRef); ok2 {
if q.Column.Name == key {
if q.Table.Name == oldTableName {
n.X.(*sqlparser.QualifiedRef).Table.Name = tableName
}
}
Expand All @@ -310,6 +310,22 @@ func replaceConditionTableName(key, tableName string, expr sqlparser.Expr) error
}), expr)
return err
}
func replaceSelectFieldTableName(oldTableName, tableName string, columns *sqlparser.OutputNames) *sqlparser.OutputNames {
rcs := []*sqlparser.ResultColumn(*columns)
for i := 0; i < len(rcs); i++ {
rc := rcs[i]
if n, ok := rc.Expr.(*sqlparser.BinaryExpr); ok {
if q, ok2 := n.X.(*sqlparser.QualifiedRef); ok2 {
if q.Table.Name == oldTableName {
n.X.(*sqlparser.QualifiedRef).Table.Name = tableName
}
}
}

}
r := sqlparser.OutputNames(rcs)
return &r
}

// resolve split the old query to full table query and sharding table query
func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) {
Expand Down Expand Up @@ -454,20 +470,20 @@ func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableNa
ftQuery = stmt.String()
stmt.FromItems = newTable
stmt.OrderBy = replaceOrderByTableName(stmt.OrderBy, tableName, newTable.Name.Name)
replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition)
replaceConditionTableName(tableName, newTable.Name.Name, stmt.Condition)
stmt.Columns = replaceSelectFieldTableName(tableName, newTable.Name.Name, stmt.Columns)
stQuery = stmt.String()
case *sqlparser.UpdateStatement:
ftQuery = stmt.String()
stmt.TableName = newTable
replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition)
replaceConditionTableName(tableName, newTable.Name.Name, stmt.Condition)
stQuery = stmt.String()
case *sqlparser.DeleteStatement:
ftQuery = stmt.String()
stmt.TableName = newTable
replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition)
replaceConditionTableName(tableName, newTable.Name.Name, stmt.Condition)
stQuery = stmt.String()
}
slog.Debug(stQuery)
}

return
Expand Down

0 comments on commit e436477

Please sign in to comment.