From 4d0a69551bb2b521cc411dad8948b41fd2c85382 Mon Sep 17 00:00:00 2001 From: ThreadDao Date: Tue, 2 Apr 2024 20:29:45 +0800 Subject: [PATCH] Add test cases for mmap Signed-off-by: ThreadDao --- test/base/milvus_client.go | 16 ++ test/testcases/load_release_test.go | 305 ++++++++++++++++++++++++++++ 2 files changed, 321 insertions(+) diff --git a/test/base/milvus_client.go b/test/base/milvus_client.go index a5835ae0..bb94dd2a 100644 --- a/test/base/milvus_client.go +++ b/test/base/milvus_client.go @@ -198,6 +198,14 @@ func (mc *MilvusClient) HasCollection(ctx context.Context, collName string) (boo return has, err } +// AlterCollection changes collection attributes +func (mc *MilvusClient) AlterCollection(ctx context.Context, collName string, attrs ...entity.CollectionAttribute) error { + preRequest("AlterCollection", ctx, collName) + err := mc.mClient.AlterCollection(ctx, collName, attrs...) + postResponse("AlterCollection", err) + return err +} + // -- alias -- // CreateAlias Create Alias @@ -356,6 +364,14 @@ func (mc *MilvusClient) GetIndexState(ctx context.Context, collName string, fiel return indexState, err } +// AlterIndex modifies the index params. +func (mc *MilvusClient) AlterIndex(ctx context.Context, collName string, indexName string, opts ...client.IndexOption) error { + preRequest("AlterIndex", ctx, collName, indexName, opts) + err := mc.mClient.AlterIndex(ctx, collName, indexName, opts...) + postResponse("AlterIndex", err) + return err +} + // -- basic operation -- // Insert insert data diff --git a/test/testcases/load_release_test.go b/test/testcases/load_release_test.go index 0d0e86af..df8d878e 100644 --- a/test/testcases/load_release_test.go +++ b/test/testcases/load_release_test.go @@ -5,9 +5,12 @@ package testcases import ( "fmt" "log" + "strconv" "testing" "time" + "github.com/milvus-io/milvus-sdk-go/v2/client" + "github.com/milvus-io/milvus-sdk-go/v2/entity" "github.com/stretchr/testify/require" @@ -512,3 +515,305 @@ func TestReleaseMultiPartitions(t *testing.T) { []string{common.DefaultIntFieldName}) common.CheckErr(t, errQuery, false, "not loaded into memory when query") } + +// test mmap collection raw data and index +// create -> insert -> flush -> index with mmap -> load -> alter collection with mmap -> reload -> read op +func TestMmapCollectionIndexDefault(t *testing.T) { + ctx := createContext(t, time.Second*common.DefaultTimeout*2) + // connect + mc := createMilvusClient(ctx, t) + + // create -> insert [0, 3000) -> flush + cp := CollectionParams{CollectionFieldsType: AllFields, AutoID: false, EnableDynamicField: true, + ShardsNum: common.DefaultShards, Dim: common.DefaultDim} + collName := createCollection(ctx, t, mc, cp) + + dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: AllFields, start: 0, + nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: true} + insertData(ctx, t, mc, dp) + _ = mc.Flush(ctx, collName, false) + + // create vector index with mmap + GenDefaultIndexParamsForAllVectors() + indexHnsw, _ := entity.NewIndexHNSW(entity.L2, 8, 96) + indexBinary, _ := entity.NewIndexBinIvfFlat(entity.JACCARD, 64) + for _, fieldName := range common.AllVectorsFieldsName { + if fieldName == common.DefaultBinaryVecFieldName { + err := mc.CreateIndex(ctx, collName, fieldName, indexBinary, false, client.WithMmap(true)) + common.CheckErr(t, err, true) + } else if fieldName == common.DefaultFloatVecFieldName { + err := mc.CreateIndex(ctx, collName, fieldName, indexHnsw, false, client.WithMmap(true)) + common.CheckErr(t, err, true) + } else { + err := mc.CreateIndex(ctx, collName, fieldName, indexHnsw, false) + common.CheckErr(t, err, true) + } + } + + // describe index and check mmap + for _, fieldName := range common.AllVectorsFieldsName { + if fieldName == common.DefaultFloatVecFieldName || fieldName == common.DefaultBinaryVecFieldName { + indexes, _ := mc.DescribeIndex(ctx, collName, fieldName) + require.Len(t, indexes, 1) + // check index mmap + require.Equal(t, "true", indexes[0].Params()["mmap.enabled"]) + } + } + + // load collection -> describe collection and check mmap false + _ = mc.LoadCollection(ctx, collName, false) + coll, _ := mc.DescribeCollection(ctx, collName) + require.Equal(t, "", coll.Properties["mmap.enabled"]) + + // alter collection and check collection mmap + _ = mc.ReleaseCollection(ctx, collName) + err := mc.AlterCollection(ctx, collName, entity.Mmap(true)) + common.CheckErr(t, err, true) + + //describe collection + mc.LoadCollection(ctx, collName, false) + coll, _ = mc.DescribeCollection(ctx, collName) + require.Equal(t, "true", coll.Properties["mmap.enabled"]) + + // query + queryRes, err := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < 10", common.DefaultIntFieldName), []string{"*"}) + common.CheckErr(t, err, true) + require.Equal(t, queryRes.GetColumn(common.DefaultIntFieldName).Len(), 10) + common.CheckOutputFields(t, queryRes, common.GetAllFieldsName(true, false)) + + // search + queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector) + sp, _ := entity.NewIndexHNSWSearchParam(74) + expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName) + searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName, + entity.L2, common.DefaultTopK, sp) + common.CheckOutputFields(t, searchRes[0].Fields, common.GetAllFieldsName(true, false)) + + // hybrid search + queryVec2 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeBinaryVector) + sReqs := []*client.ANNSearchRequest{ + client.NewANNSearchRequest(common.DefaultFloatVecFieldName, entity.L2, "", queryVec1, sp, common.DefaultTopK, client.WithOffset(3)), + client.NewANNSearchRequest(common.DefaultBinaryVecFieldName, entity.JACCARD, "", queryVec2, sp, common.DefaultTopK), + } + _, errSearch := mc.HybridSearch(ctx, collName, []string{}, common.DefaultTopK, []string{}, client.NewRRFReranker(), sReqs) + common.CheckErr(t, errSearch, true) +} + +// test mmap collection raw data and index +// create -> insert -> flush -> index -> load -> alter collection with mmap -> alter index with mmap -> reload -> read op +func TestMmapAlterCollectionIndexDefault(t *testing.T) { + ctx := createContext(t, time.Second*common.DefaultTimeout*2) + // connect + mc := createMilvusClient(ctx, t) + + // create -> insert [0, 3000) -> flush + cp := CollectionParams{CollectionFieldsType: AllFields, AutoID: false, EnableDynamicField: true, + ShardsNum: common.DefaultShards, Dim: common.DefaultDim} + + dp := DataParams{DoInsert: true, CollectionFieldsType: AllFields, start: 0, + nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: true} + + ips := GenDefaultIndexParamsForAllVectors() + collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithIndexParams(ips), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong))) + + // describe index and check mmap + for _, fieldName := range common.AllVectorsFieldsName { + indexes, _ := mc.DescribeIndex(ctx, collName, fieldName) + // check index mmap + require.Equal(t, "", indexes[0].Params()["mmap.enabled"]) + } + + //describe collection + mc.LoadCollection(ctx, collName, false) + coll, _ := mc.DescribeCollection(ctx, collName) + require.Equal(t, "", coll.Properties["mmap.enabled"]) + + // alter mmap: collection and index + _ = mc.ReleaseCollection(ctx, collName) + err := mc.AlterCollection(ctx, collName, entity.Mmap(true)) + common.CheckErr(t, err, true) + for _, fieldName := range common.AllVectorsFieldsName { + err := mc.AlterIndex(ctx, collName, fieldName, client.WithMmap(true)) + common.CheckErr(t, err, true) + } + + // load collection -> describe collection and check mmap false + //describe collection + mc.LoadCollection(ctx, collName, false) + coll, _ = mc.DescribeCollection(ctx, collName) + require.Equal(t, "true", coll.Properties["mmap.enabled"]) + for _, fieldName := range common.AllVectorsFieldsName { + idx, _ := mc.DescribeIndex(ctx, collName, fieldName) + require.Equal(t, "true", idx[0].Params()["mmap.enabled"]) + } + + // query + queryRes, err := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < 10", common.DefaultIntFieldName), []string{"*"}) + common.CheckErr(t, err, true) + require.Equal(t, queryRes.GetColumn(common.DefaultIntFieldName).Len(), 10) + common.CheckOutputFields(t, queryRes, common.GetAllFieldsName(true, false)) + + // search + queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector) + sp, _ := entity.NewIndexHNSWSearchParam(74) + expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName) + searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName, + entity.L2, common.DefaultTopK, sp) + common.CheckOutputFields(t, searchRes[0].Fields, common.GetAllFieldsName(true, false)) + + // hybrid search + queryVec2 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeBinaryVector) + sReqs := []*client.ANNSearchRequest{ + client.NewANNSearchRequest(common.DefaultFloatVecFieldName, entity.L2, "", queryVec1, sp, common.DefaultTopK, client.WithOffset(3)), + client.NewANNSearchRequest(common.DefaultBinaryVecFieldName, entity.JACCARD, "", queryVec2, sp, common.DefaultTopK), + } + _, errSearch := mc.HybridSearch(ctx, collName, []string{}, common.DefaultTopK, []string{}, client.NewRRFReranker(), sReqs) + common.CheckErr(t, errSearch, true) +} + +// test mmap collection loaded +func TestMmapCollectionLoaded(t *testing.T) { + ctx := createContext(t, time.Second*common.DefaultTimeout*2) + // connect + mc := createMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false, + ShardsNum: common.DefaultShards, Dim: common.DefaultDim} + + dp := DataParams{DoInsert: true, CollectionFieldsType: Int64FloatVec, start: 0, nb: common.DefaultNb, + dim: common.DefaultDim, EnableDynamicField: false} + + collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong))) + + // mmap collection raw data + err := mc.AlterCollection(ctx, collName, entity.Mmap(true)) + common.CheckErr(t, err, false, "can not alter mmap properties if collection loaded") + + // mmap index + err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(true)) + common.CheckErr(t, err, false, "can't alter index on loaded collection, please release the collection first") +} + +// test mmap collection which scalar field indexed +func TestMmapCollectionScalarIndexed(t *testing.T) { + ctx := createContext(t, time.Second*common.DefaultTimeout*2) + // connect + mc := createMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false, + ShardsNum: common.DefaultShards, Dim: common.DefaultDim} + + dp := DataParams{DoInsert: true, CollectionFieldsType: Int64FloatVec, start: 0, nb: common.DefaultNb, + dim: common.DefaultDim, EnableDynamicField: false} + + collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong))) + + // create scalar index + for _, fName := range []string{common.DefaultIntFieldName, common.DefaultFloatFieldName} { + err := mc.CreateIndex(ctx, collName, fName, entity.NewScalarIndexWithType(entity.Inverted), false) + common.CheckErr(t, err, true) + } + + // mmap collection + mc.ReleaseCollection(ctx, collName) + err := mc.AlterCollection(ctx, collName, entity.Mmap(true)) + common.CheckErr(t, err, true) + err = mc.LoadCollection(ctx, collName, false) + common.CheckErr(t, err, true) +} + +// test mmap unsupported index: DiskANN, GPU-class +func TestMmapIndexUnsupported(t *testing.T) { + t.Skip("https://github.com/milvus-io/milvus-sdk-go/issues/714") + ctx := createContext(t, time.Second*common.DefaultTimeout*2) + // connect + mc := createMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false, + ShardsNum: common.DefaultShards, Dim: common.DefaultDim} + collName := createCollection(ctx, t, mc, cp) + + dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0, + nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false} + insertData(ctx, t, mc, dp) + mc.Flush(ctx, collName, false) + + //create index with mmap + idx, _ := entity.NewIndexDISKANN(entity.COSINE) + err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idx, false, client.WithMmap(true)) + common.CheckErr(t, err, false, "index type DISKANN does not support mmap") + + //create scalar index with mmap + for _, idx := range []entity.Index{entity.NewScalarIndex(), entity.NewScalarIndexWithType(entity.Inverted)} { + err := mc.CreateIndex(ctx, collName, common.DefaultFloatFieldName, idx, false, client.WithMmap(true)) + common.CheckErr(t, err, false, "does not support mmap") + } +} + +func TestAlterIndexMmapUnsupportedIndex(t *testing.T) { + ctx := createContext(t, time.Second*common.DefaultTimeout*2) + // connect + mc := createMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false, + ShardsNum: common.DefaultShards, Dim: common.DefaultDim} + collName := createCollection(ctx, t, mc, cp) + + dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0, + nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false} + insertData(ctx, t, mc, dp) + mc.Flush(ctx, collName, false) + + idxHnsw, _ := entity.NewIndexDISKANN(entity.IP) + err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idxHnsw, false) + common.CheckErr(t, err, true) + for _, mmap := range []bool{true, false} { + err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(mmap)) + common.CheckErr(t, err, false, "index type DISKANN does not support mmap: invalid parameter") + } +} + +func TestMmapAlterIndex(t *testing.T) { + t.Parallel() + for _, mmap := range []bool{true, false} { + ctx := createContext(t, time.Second*common.DefaultTimeout*2) + // connect + mc := createMilvusClient(ctx, t) + + // create -> insert -> flush -> index -> load + cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false, + ShardsNum: common.DefaultShards, Dim: common.DefaultDim} + collName := createCollection(ctx, t, mc, cp) + + dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0, + nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false} + insertData(ctx, t, mc, dp) + mc.Flush(ctx, collName, false) + + // create index and enable mmap + idxHnsw, _ := entity.NewIndexHNSW(entity.COSINE, 8, 96) + err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idxHnsw, false, client.WithMmap(!mmap)) + common.CheckErr(t, err, true) + + // alter index and enable mmap + err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(mmap)) + common.CheckErr(t, err, true) + + idx, _ := mc.DescribeIndex(ctx, collName, common.DefaultFloatVecFieldName) + require.Equal(t, strconv.FormatBool(mmap), idx[0].Params()["mmap.enabled"]) + + err = mc.LoadCollection(ctx, collName, false) + common.CheckErr(t, err, true) + + queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector) + sp, _ := entity.NewIndexHNSWSearchParam(74) + expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName) + searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName, + entity.COSINE, common.DefaultTopK, sp) + common.CheckOutputFields(t, searchRes[0].Fields, []string{common.DefaultIntFieldName, common.DefaultFloatFieldName, common.DefaultFloatVecFieldName}) + } +}