Skip to content

Commit

Permalink
coreservice的db调用调整为传入租户 (#8286)
Browse files Browse the repository at this point in the history
--story=120467693
  • Loading branch information
bd-xiaowang authored Dec 5, 2024
1 parent ee72737 commit 7004b81
Show file tree
Hide file tree
Showing 94 changed files with 1,754 additions and 1,532 deletions.
5 changes: 2 additions & 3 deletions src/common/util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@ import (
"configcenter/src/common"
"configcenter/src/common/errors"
"configcenter/src/common/http/rest"
"configcenter/src/storage/dal"
"configcenter/src/storage/driver/mongodb"
)

// ConvDBInsertError convert db insert error to
func ConvDBInsertError(kit *rest.Kit, db dal.RDB, err error) errors.CCErrorCoder {
func ConvDBInsertError(kit *rest.Kit, err error) errors.CCErrorCoder {
if err == nil {
return nil
}

if db.IsDuplicatedError(err) {
if mongodb.IsDuplicatedError(err) {
return kit.CCError.CCErrorf(common.CCErrCommDuplicateItem, mongodb.GetDuplicateKey(err))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"configcenter/src/common/auth"
"configcenter/src/common/blog"
httpheader "configcenter/src/common/http/header"
"configcenter/src/common/http/rest"
"configcenter/src/common/mapstr"
"configcenter/src/common/metadata"
"configcenter/src/common/resource/esb"
Expand Down Expand Up @@ -223,7 +224,8 @@ func migrateModelInstancePolicy(ctx context.Context, action iamtype.ActionID, db
}

// get objectID by instance IDs to judge if the instances belongs to the object specified
instIDObjMappings, err := instancemapping.GetInstanceObjectMapping(instanceIDs)
kit := rest.NewKit()
instIDObjMappings, err := instancemapping.GetInstanceObjectMapping(kit, instanceIDs)
if err != nil {
blog.Errorf("get instance object mapping from instance ids(%+v) failed, err: %v", instanceIDs, err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"configcenter/pkg/cache/general"
"configcenter/src/common"
"configcenter/src/common/blog"
"configcenter/src/common/http/rest"
"configcenter/src/common/mapstr"
"configcenter/src/common/metadata"
"configcenter/src/common/util"
Expand Down Expand Up @@ -61,7 +62,8 @@ func parseObjInstData(data dataWithTable[mapstr.MapStr]) (*basicInfo, error) {
return nil, fmt.Errorf("parse id %+v failed, err: %v", data.Data[common.BKInstIDField], err)
}

instObjMappings, err := instancemapping.GetInstanceObjectMapping([]int64{instID})
kit := rest.NewKit()
instObjMappings, err := instancemapping.GetInstanceObjectMapping(kit, []int64{instID})
if err != nil {
return nil, fmt.Errorf("get object ids from instance ids(%d) failed, err: %v", instID, err)
}
Expand Down
2 changes: 1 addition & 1 deletion src/source_controller/cacheservice/event/watch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func (c *Client) getDetailsByOids(kit *rest.Kit, oids []primitive.ObjectID, fiel
return oidDetailMap, nil

case common.BKTableNameBaseInst, common.BKTableNameMainlineInstance:
instObjMappings, err := instancemapping.GetInstanceObjectMapping(instIDs)
instObjMappings, err := instancemapping.GetInstanceObjectMapping(kit, instIDs)
if err != nil {
blog.Errorf("get object ids from instance ids(%+v) failed, err: %v, rid: %s", instIDs, err, kit.Rid)
return nil, err
Expand Down
15 changes: 2 additions & 13 deletions src/source_controller/coreservice/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,13 @@ func initResource(coreSvr *CoreServer, op *options.ServerOption) error {
}

coreSvr.Config.Mongo.DisableInsert = op.DisableInsert
dbErr := mongodb.InitClient("", &coreSvr.Config.Mongo)
if dbErr != nil {
blog.Errorf("failed to connect the db server, error info is %s", dbErr.Error())
return dbErr
}

cacheRrr := redis.InitClient("redis", &coreSvr.Config.Redis)
if cacheRrr != nil {
blog.Errorf("new redis client failed, err: %v", cacheRrr)
return cacheRrr
}

initErr := mongodb.Client().InitTxnManager(redis.Client())
if initErr != nil {
blog.Errorf("failed to init txn manager, error info is %v", initErr)
return initErr
}

cryptoConf, err := cc.Crypto("crypto")
if err != nil {
blog.Errorf("get crypto config failed, err: %v", err)
Expand All @@ -145,12 +134,12 @@ func initResource(coreSvr *CoreServer, op *options.ServerOption) error {
err = mongodb.SetShardingCli("", &coreSvr.Config.Mongo, cryptoConf)
if err != nil {
blog.Errorf("new mongodb client failed, err: %v", err)
return dbErr
return err
}

if err = mongodb.Dal().InitTxnManager(redis.Client()); err != nil {
blog.Errorf("init txn manager failed, err: %v", err)
return initErr
return err
}

return nil
Expand Down
44 changes: 22 additions & 22 deletions src/source_controller/coreservice/core/association/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *associationInstance) searchInstanceAssociation(kit *rest.Kit, objID str

results := make([]metadata.InstAsst, 0)
asstTableName := common.GetObjectInstAsstTableName(objID, kit.TenantID)
instHandler := mongodb.Client().Table(asstTableName).Find(param.Condition).Fields(param.Fields...)
instHandler := mongodb.Shard(kit.ShardOpts()).Table(asstTableName).Find(param.Condition).Fields(param.Fields...)
err := instHandler.Start(uint64(param.Page.Start)).Limit(uint64(param.Page.Limit)).
Sort(param.Page.Sort).All(kit.Ctx, &results)
return results, err
Expand All @@ -68,7 +68,7 @@ func (m *associationInstance) searchInstanceAssociation(kit *rest.Kit, objID str
func (m *associationInstance) countInstanceAssociation(kit *rest.Kit, objID string, cond mapstr.MapStr) (uint64,
error) {
asstTableName := common.GetObjectInstAsstTableName(objID, kit.TenantID)
return mongodb.Client().Table(asstTableName).Find(cond).Count(kit.Ctx)
return mongodb.Shard(kit.ShardOpts()).Table(asstTableName).Find(cond).Count(kit.Ctx)
}

func (m *associationInstance) checkAssociationMapping(kit *rest.Kit, objAsstID string, instID int64,
Expand Down Expand Up @@ -139,7 +139,7 @@ func (m *associationInstance) checkAssociationMapping(kit *rest.Kit, objAsstID s
}

func (m *associationInstance) save(kit *rest.Kit, asstInst metadata.InstAsst) (id uint64, err error) {
id, err = mongodb.Client().NextSequence(kit.Ctx, common.BKTableNameInstAsst)
id, err = mongodb.Shard(kit.SysShardOpts()).NextSequence(kit.Ctx, common.BKTableNameInstAsst)
if err != nil {
return id, kit.CCError.New(common.CCErrObjectDBOpErrno, err.Error())
}
Expand All @@ -148,7 +148,7 @@ func (m *associationInstance) save(kit *rest.Kit, asstInst metadata.InstAsst) (i
asstInst.TenantID = kit.TenantID

objInstAsstTableName := common.GetObjectInstAsstTableName(asstInst.ObjectID, kit.TenantID)
err = mongodb.Client().Table(objInstAsstTableName).Insert(kit.Ctx, asstInst)
err = mongodb.Shard(kit.ShardOpts()).Table(objInstAsstTableName).Insert(kit.Ctx, asstInst)
if err != nil {
return id, err
}
Expand All @@ -159,18 +159,18 @@ func (m *associationInstance) save(kit *rest.Kit, asstInst metadata.InstAsst) (i
}

asstObjInstAsstTableName := common.GetObjectInstAsstTableName(asstInst.AsstObjectID, kit.TenantID)
err = mongodb.Client().Table(asstObjInstAsstTableName).Insert(kit.Ctx, asstInst)
err = mongodb.Shard(kit.ShardOpts()).Table(asstObjInstAsstTableName).Insert(kit.Ctx, asstInst)
return id, err
}

func (m *associationInstance) deleteInstanceAssociation(kit *rest.Kit, objID string,
cond mapstr.MapStr) (uint64, error) {
asstInstTableName := common.GetObjectInstAsstTableName(objID, kit.TenantID)
associations := make([]metadata.InstAsst, 0)
if err := mongodb.Client().Table(asstInstTableName).Find(cond).Fields(common.BKObjIDField, common.BKAsstObjIDField).
All(kit.Ctx, &associations); err != nil {
blog.ErrorJSON("delete instance association error. objID: %s, cond: %s, err: %s, rid: %s",
objID, cond, err.Error(), kit.Rid)
if err := mongodb.Shard(kit.ShardOpts()).Table(asstInstTableName).Find(cond).Fields(common.BKObjIDField,
common.BKAsstObjIDField).All(kit.Ctx, &associations); err != nil {
blog.Errorf("delete instance association error. objID: %s, cond: %v, err: %v, rid: %s", objID, cond, err,
kit.Rid)
return 0, err
}

Expand All @@ -192,18 +192,18 @@ func (m *associationInstance) deleteInstanceAssociation(kit *rest.Kit, objID str
objIDMap[asstObjID] = struct{}{}

asstTableName := common.GetObjectInstAsstTableName(asstObjID, kit.TenantID)
err := mongodb.Client().Table(asstTableName).Delete(kit.Ctx, cond)
err := mongodb.Shard(kit.ShardOpts()).Table(asstTableName).Delete(kit.Ctx, cond)
if err != nil {
blog.ErrorJSON("delete instance association error. objID: %s, cond: %s, err: %s, rid: %s",
asstObjID, cond, err.Error(), kit.Rid)
blog.Errorf("delete instance association error. objID: %s, cond: %v, err: %v, rid: %s",
asstObjID, cond, err, kit.Rid)
return 0, err
}
}

cnt, err := mongodb.Client().Table(asstInstTableName).DeleteMany(kit.Ctx, cond)
cnt, err := mongodb.Shard(kit.ShardOpts()).Table(asstInstTableName).DeleteMany(kit.Ctx, cond)
if err != nil {
blog.ErrorJSON("delete instance association error. objID: %s, cond: %s, err: %s, rid: %s",
objID, cond, err.Error(), kit.Rid)
blog.Errorf("delete instance association error. objID: %s, cond: %v, err: %v, rid: %s",
objID, cond, err, kit.Rid)
return 0, err
}
return cnt, nil
Expand Down Expand Up @@ -355,13 +355,13 @@ func (m *associationInstance) checkInstAsstCreateData(kit *rest.Kit, inputParam
checkAssoCond.Element(&mongo.Eq{Key: common.TenantID, Val: kit.TenantID})
assoItems, err := m.search(kit, checkAssoCond)
if err != nil {
blog.ErrorJSON("search associations with condition: %s failed, err: %s, rid: %s",
checkAssoCond.ToMapStr(), err.Error(), kit.Rid)
blog.Errorf("search associations with condition: %v failed, err: %v, rid: %s",
checkAssoCond.ToMapStr(), err, kit.Rid)
return "", err
}

if len(assoItems) != 1 {
blog.ErrorJSON("association with cond: %s not exist, rid: %s", checkAssoCond.ToMapStr(), kit.Rid)
blog.Errorf("association with cond: %v not exist, rid: %s", checkAssoCond.ToMapStr(), kit.Rid)
return "", kit.CCError.CCErrorf(common.CCERrrCoreServiceConcurrent)
}

Expand Down Expand Up @@ -445,8 +445,8 @@ func (m *associationInstance) SearchInstanceAssociation(kit *rest.Kit, objID str
}

instAsstItems, err := m.searchInstanceAssociation(kit, objID, param)
if nil != err {
blog.ErrorJSON("search inst association err: %s, objID: %s, param: %s, rid: %s", err, objID, param, kit.Rid)
if err != nil {
blog.Errorf("search inst association err: %v, objID: %s, param: %s, rid: %s", err, objID, param, kit.Rid)
return nil, err
}

Expand All @@ -455,7 +455,7 @@ func (m *associationInstance) SearchInstanceAssociation(kit *rest.Kit, objID str
// the InstAsst number will be counted by default.
if !param.DisableCounter {
count, err := m.countInstanceAssociation(kit, objID, param.Condition)
if nil != err {
if err != nil {
blog.Errorf("search model instance associations count err: %s, rid: %s", err.Error(), kit.Rid)
return nil, err
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (m *associationInstance) DeleteInstanceAssociation(kit *rest.Kit, objID str
}

cnt, err := m.deleteInstanceAssociation(kit, objID, inputParam.Condition)
if nil != err {
if err != nil {
blog.Errorf("delete inst association [%#v] err [%#v], rid: %s", inputParam.Condition, err, kit.Rid)
return nil, err
}
Expand Down
Loading

0 comments on commit 7004b81

Please sign in to comment.