From 27c48c030b48405e894669cef2bc475d63f2c003 Mon Sep 17 00:00:00 2001 From: "wenhui.zhang" Date: Tue, 6 Aug 2024 15:20:55 +0800 Subject: [PATCH 1/4] support database and upsert write --- core/common/constant.go | 2 ++ core/config/config.go | 3 ++ core/config/resolve.go | 3 ++ core/dbclient/cus_field_milvus2x.go | 4 +++ core/dbclient/milvus2x.go | 49 +++++++++++++++++++++++++-- core/loader/cus_milvus2x_loader.go | 10 ++++-- starter/migration/milvus2x_starter.go | 2 +- storage/milvus2x/milvus2_3_ver.go | 10 +++++- 8 files changed, 76 insertions(+), 7 deletions(-) diff --git a/core/common/constant.go b/core/common/constant.go index e80f1c7..ff42298 100644 --- a/core/common/constant.go +++ b/core/common/constant.go @@ -59,3 +59,5 @@ var LOAD_CHECK_BULK_STATE_INTERVAL = time.Second * 10 //second var LOAD_CHECK_BACKLOG_INTERVAL = time.Second * 10 //second // const SUB_FILE_SIZE = 1024 * 1024 * 512 //512MB const SUB_FILE_SIZE = 1024 * 1024 * 300 + +const UPSERT = "upsert" diff --git a/core/config/config.go b/core/config/config.go index 064540a..02b6fac 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -78,6 +78,9 @@ type Milvus2xConfig struct { GrpcMaxRecvMsgSize int GrpcMaxSendMsgSize int + Database string + WriteMode string //insert or upsert + Version string //internal param hashCache atomic.Uint32 } diff --git a/core/config/resolve.go b/core/config/resolve.go index 5519bd0..a10f79d 100644 --- a/core/config/resolve.go +++ b/core/config/resolve.go @@ -221,6 +221,8 @@ func resolveTargetMilvus2xConfig(v *viper.Viper) *Milvus2xConfig { Password: v.GetString("target.milvus2x.password"), GrpcMaxRecvMsgSize: v.GetInt("target.milvus2x.grpc.maxCallRecvMsgSize"), GrpcMaxSendMsgSize: v.GetInt("target.milvus2x.grpc.maxCallSendMsgSize"), + Database: v.GetString("target.milvus2x.database"), + WriteMode: v.GetString("target.milvus2x.writeMode"), } } @@ -358,5 +360,6 @@ func resolveSourceMilvus2xConfig(v *viper.Viper) *Milvus2xConfig { Password: v.GetString("source.milvus2x.password"), GrpcMaxRecvMsgSize: v.GetInt("source.milvus2x.grpc.maxCallRecvMsgSize"), GrpcMaxSendMsgSize: v.GetInt("source.milvus2x.grpc.maxCallSendMsgSize"), + Database: v.GetString("source.milvus2x.database"), } } diff --git a/core/dbclient/cus_field_milvus2x.go b/core/dbclient/cus_field_milvus2x.go index 42a28f1..dbd7594 100644 --- a/core/dbclient/cus_field_milvus2x.go +++ b/core/dbclient/cus_field_milvus2x.go @@ -150,3 +150,7 @@ func (this *CustomFieldMilvus2x) CheckLoadStatus(ctx context.Context, collection func (cus *CustomFieldMilvus2x) StartBatchInsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error { return cus.Milvus2x.StartBatchInsert(ctx, collection, data) } + +func (cus *CustomFieldMilvus2x) StartBatchUpsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error { + return cus.Milvus2x.StartBatchUpsert(ctx, collection, data) +} diff --git a/core/dbclient/milvus2x.go b/core/dbclient/milvus2x.go index cec02de..9ba00b8 100644 --- a/core/dbclient/milvus2x.go +++ b/core/dbclient/milvus2x.go @@ -28,6 +28,7 @@ func (this *Milvus2x) GetMilvus() client.Client { return this.milvus } +// NewMilvus2xClient 这里为 target milvus的统一创建入口,和source区分开 func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) { log.Info("begin to new milvus2x client", @@ -67,11 +68,20 @@ func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) { return nil, err } - log.Info("[Milvus2x] begin to test connect", + log.Info("[Milvus2x] begin to test target connect", zap.String("endpoint", cfg.Endpoint), zap.String("username", cfg.UserName), + zap.String("databaseName", cfg.Database), zap.Int("GrpcMaxCallRecvMsgSize", cfg.GrpcMaxRecvMsgSize), zap.Int("GrpcMaxCallSendMsgSize", cfg.GrpcMaxSendMsgSize)) + + if cfg.Database != "" && len(cfg.Database) > 0 { + err = useDatabase(cfg, milvus, ctx) + if err != nil { + return nil, err + } + } + _, err = milvus.HasCollection(ctx, "test") if err != nil { return nil, err @@ -84,6 +94,31 @@ func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) { return c, nil } +func useDatabase(cfg *config.Milvus2xConfig, milvus client.Client, ctx context.Context) error { + dbs, err := milvus.ListDatabases(ctx) + if err != nil { + return err + } + var dbExists = false + for _, db := range dbs { + if db.Name == cfg.Database { + dbExists = true + break + } + } + if !dbExists { + err = milvus.CreateDatabase(ctx, cfg.Database) + if err != nil { + return err + } + } + err = milvus.UsingDatabase(ctx, cfg.Database) + if err != nil { + return err + } + return nil +} + func (this *Milvus2x) CheckNeedCreateCollection(ctx context.Context, createParam *common.CollectionParam) error { log.Info("Begin to CheckNeedCreateCollection,", zap.String("collection", createParam.CollectionName)) exist, err := this.milvus.HasCollection(ctx, createParam.CollectionName) @@ -243,6 +278,16 @@ func (this *Milvus2x) StartBatchInsert(ctx context.Context, collection string, d log.L().Info("[Loader] BatchInsert return err", zap.Error(err)) return err } - log.LL(ctx).Info("[Loader] success to batchInsert to Milvus", zap.String("col", collection)) + log.LL(ctx).Info("[Loader] success to BatchInsert to Milvus", zap.String("col", collection)) + return nil +} + +func (this *Milvus2x) StartBatchUpsert(ctx context.Context, collection string, data *milvus2x.Milvus2xData) error { + _, err := this.milvus.Upsert(ctx, collection, "", data.Columns...) + if err != nil { + log.L().Info("[Loader] BatchUpsert return err", zap.Error(err)) + return err + } + log.LL(ctx).Info("[Loader] success to BatchUpsert to Milvus", zap.String("col", collection)) return nil } diff --git a/core/loader/cus_milvus2x_loader.go b/core/loader/cus_milvus2x_loader.go index 89eb9ff..494e63a 100644 --- a/core/loader/cus_milvus2x_loader.go +++ b/core/loader/cus_milvus2x_loader.go @@ -121,8 +121,12 @@ func (this *CustomMilvus2xLoader) compareResult(ctx context.Context) error { return nil } -func (this *CustomMilvus2xLoader) WriteByBatchInsert(ctx context.Context, data *milvus2x.Milvus2xData) error { +func (this *CustomMilvus2xLoader) BatchWrite(ctx context.Context, data *milvus2x.Milvus2xData) error { - log.LL(ctx).Info("[Loader] Begin to write data by batchInsert sdk to milvus", zap.String("collection", this.runtimeCollectionNames[0])) - return this.CusMilvus2x.StartBatchInsert(ctx, this.runtimeCollectionNames[0], data) + log.LL(ctx).Info("[Loader] Begin to batchWrite data to milvus", zap.String("collection", this.runtimeCollectionNames[0])) + if this.cfg.TargetMilvus2xCfg.WriteMode != "" && this.cfg.TargetMilvus2xCfg.WriteMode == common.UPSERT { + return this.CusMilvus2x.StartBatchUpsert(ctx, this.runtimeCollectionNames[0], data) + } else { + return this.CusMilvus2x.StartBatchInsert(ctx, this.runtimeCollectionNames[0], data) + } } diff --git a/starter/migration/milvus2x_starter.go b/starter/migration/milvus2x_starter.go index 3c9fae9..03568c7 100644 --- a/starter/migration/milvus2x_starter.go +++ b/starter/migration/milvus2x_starter.go @@ -72,7 +72,7 @@ func (starter *Starter) dumpByIterator(ctx context.Context, collCfg *milvus2xtyp func (starter *Starter) loadByBatchInsert(ctx context.Context, dataChannel chan *milvus2x.Milvus2xData) error { for data := range dataChannel { - err := starter.Loader.WriteByBatchInsert(ctx, data) + err := starter.Loader.BatchWrite(ctx, data) if err != nil { return err } diff --git a/storage/milvus2x/milvus2_3_ver.go b/storage/milvus2x/milvus2_3_ver.go index 9d67035..78c6447 100644 --- a/storage/milvus2x/milvus2_3_ver.go +++ b/storage/milvus2x/milvus2_3_ver.go @@ -110,6 +110,7 @@ func (milvus23 *Milvus23VerClient) DescCollection(ctx context.Context, collectio return collEntity, nil } +// 这里统一给source创建milvus client, 和target区分开 func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, error) { log.Info("[Milvus23x] begin to new milvus client", zap.String("endPoint", cfg.Endpoint)) @@ -148,12 +149,19 @@ func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, e return nil, err } - log.Info("[Milvus23x] begin to test connect", + log.Info("[Milvus23x] begin to test source connect", zap.String("endpoint", cfg.Endpoint), zap.String("username", cfg.UserName), + zap.String("databaseName", cfg.Database), zap.Int("GrpcMaxCallRecvMsgSize", cfg.GrpcMaxRecvMsgSize), zap.Int("GrpcMaxCallSendMsgSize", cfg.GrpcMaxSendMsgSize)) + if cfg.Database != "" && len(cfg.Database) > 0 { + err := milvus.UsingDatabase(ctx, cfg.Database) + if err != nil { + return nil, err + } + } _, err = milvus.HasCollection(ctx, "test") if err != nil { return nil, err From 797c4b7a47380a9f15810118608673e9d916446c Mon Sep 17 00:00:00 2001 From: "wenhui.zhang" Date: Tue, 6 Aug 2024 16:02:42 +0800 Subject: [PATCH 2/4] update readme --- README_2X.md | 33 +++++++++++++++++++++++++++++++-- README_ES.md | 10 ++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/README_2X.md b/README_2X.md index 9f2e308..223a346 100644 --- a/README_2X.md +++ b/README_2X.md @@ -85,7 +85,7 @@ meta: #...... ... ``` -- if you want to customize source or target milvus grpc connection request or receive message max size, you can add below config like: +- if want to customize source or target milvus grpc connection request or receive message max size, you can add like below cfg. (If not do this, there may be errors due to the request for data exceeding the default maximum limit of the Grcp server) ```yaml ... source: @@ -103,4 +103,33 @@ meta: maxCallRecvMsgSize: 67108864 maxCallSendMsgSize: 268435456 ... -... \ No newline at end of file +... +``` + +- If Source Milvus collection is not in the database `default`, you can add `source.milvus2x.database` to specify database name. +```yaml +... + source: + milvus2x: + ... + database: my_database +... +``` +- If want to migrate data to Target Milvus collection (isn't `default` database), you can add `target.milvus2x.database` to specify database name, database name will auto create if not exists. +```yaml +... + target: + milvus2x: + ... + database: my_database +... +``` +- If want to use `upsert` write mode migrate data to Target Milvus, you can add `target.milvus2x.writeMode=upsert`. if use `upsert` mode means when the same primary key exists in your data of the source collection, it will be deduplicated. +```yaml +... + target: + milvus2x: + ... + writeMode: upsert +... +``` \ No newline at end of file diff --git a/README_ES.md b/README_ES.md index d296097..8dae8b3 100644 --- a/README_ES.md +++ b/README_ES.md @@ -173,6 +173,16 @@ target: useSSL: true ``` +- If want to migrate data to Target Milvus collection (isn't `default` database), you can add `target.milvus2x.database` to specify database name, database name will auto create if not exists. +```yaml +... + target: + milvus2x: + ... + database: my_database +... +``` + ## migration.yaml reference ### `dumper` From 66ffe17dc5245d8a9c9a0bdfffc5917ab102333f Mon Sep 17 00:00:00 2001 From: "wenhui.zhang" Date: Tue, 6 Aug 2024 16:52:20 +0800 Subject: [PATCH 3/4] optimize --- core/dbclient/milvus2x.go | 2 +- core/loader/cus_milvus2x_loader.go | 2 +- storage/milvus2x/milvus2_3_ver.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/dbclient/milvus2x.go b/core/dbclient/milvus2x.go index 9ba00b8..adde244 100644 --- a/core/dbclient/milvus2x.go +++ b/core/dbclient/milvus2x.go @@ -75,7 +75,7 @@ func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) { zap.Int("GrpcMaxCallRecvMsgSize", cfg.GrpcMaxRecvMsgSize), zap.Int("GrpcMaxCallSendMsgSize", cfg.GrpcMaxSendMsgSize)) - if cfg.Database != "" && len(cfg.Database) > 0 { + if cfg.Database != "" { err = useDatabase(cfg, milvus, ctx) if err != nil { return nil, err diff --git a/core/loader/cus_milvus2x_loader.go b/core/loader/cus_milvus2x_loader.go index 494e63a..5fdc8ab 100644 --- a/core/loader/cus_milvus2x_loader.go +++ b/core/loader/cus_milvus2x_loader.go @@ -124,7 +124,7 @@ func (this *CustomMilvus2xLoader) compareResult(ctx context.Context) error { func (this *CustomMilvus2xLoader) BatchWrite(ctx context.Context, data *milvus2x.Milvus2xData) error { log.LL(ctx).Info("[Loader] Begin to batchWrite data to milvus", zap.String("collection", this.runtimeCollectionNames[0])) - if this.cfg.TargetMilvus2xCfg.WriteMode != "" && this.cfg.TargetMilvus2xCfg.WriteMode == common.UPSERT { + if this.cfg.TargetMilvus2xCfg.WriteMode == common.UPSERT { return this.CusMilvus2x.StartBatchUpsert(ctx, this.runtimeCollectionNames[0], data) } else { return this.CusMilvus2x.StartBatchInsert(ctx, this.runtimeCollectionNames[0], data) diff --git a/storage/milvus2x/milvus2_3_ver.go b/storage/milvus2x/milvus2_3_ver.go index 78c6447..3686b83 100644 --- a/storage/milvus2x/milvus2_3_ver.go +++ b/storage/milvus2x/milvus2_3_ver.go @@ -156,7 +156,7 @@ func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, e zap.Int("GrpcMaxCallRecvMsgSize", cfg.GrpcMaxRecvMsgSize), zap.Int("GrpcMaxCallSendMsgSize", cfg.GrpcMaxSendMsgSize)) - if cfg.Database != "" && len(cfg.Database) > 0 { + if cfg.Database != "" { err := milvus.UsingDatabase(ctx, cfg.Database) if err != nil { return nil, err From d2c829d78f8ec6e6fdc2bbd2eba8346dcd57002d Mon Sep 17 00:00:00 2001 From: "wenhui.zhang" Date: Tue, 6 Aug 2024 17:01:47 +0800 Subject: [PATCH 4/4] readme upgrade go version --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dcc5476..68c2ddd 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ datas to milvus 2.x. |:-----------------------------------------|:------------------| | [Milvus](https://milvus.io/) | 0.9.x, 1.x or 2.x | | [Elasticsearch](https://www.elastic.co/) | 7.x or 8.x | -| go | 1.20.2 or later | +| go | 1.22.2 or later | - Data Format Support