Skip to content

Commit

Permalink
Merge pull request #67 from yellow-shine/mertify-update
Browse files Browse the repository at this point in the history
Mertify update
  • Loading branch information
yellow-shine authored Oct 28, 2024
2 parents 139f4e5 + 6a587ea commit 5b85a19
Show file tree
Hide file tree
Showing 197 changed files with 4,681 additions and 2,095 deletions.
15 changes: 11 additions & 4 deletions .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ misc:
- branch: &BRANCHES
# In this pull request, the changes are based on the master branch
- &MASTER_BRANCH base=master
- &23_BRANCH base=2.3
- &24_BRANCH base=2.4
- &25_BRANCH base=2.5
# In this pull request, the changes are based on the 2.x(or 2.x.x) branch
- &2X_BRANCH base~=^2(\.\d+){1,2}$

Expand Down Expand Up @@ -297,10 +300,12 @@ pull_request_rules:
- lgtm
- approved

- name: master - Remove ci-passed label when status for code checker or ut is not success
- name: master or 2.5 - Remove ci-passed label when status for code checker or ut is not success
conditions:
# branch condition: in this pull request, the changes are based on any branch referenced by BRANCHES
- *MASTER_BRANCH
- or:
- *MASTER_BRANCH
- *25_BRANCH
- label!=manual-pass
- *source_code_files
- or:
Expand All @@ -317,9 +322,11 @@ pull_request_rules:
remove:
- ci-passed

- name: 2.x - Remove ci-passed label when status for code checker or ut is not success
- name: 2.3 or 2.4 - Remove ci-passed label when status for code checker or ut is not success
conditions:
- *2X_BRANCH
- or:
- *23_BRANCH
- *24_BRANCH
- label!=manual-pass
- *source_code_files
- or:
Expand Down
1 change: 1 addition & 0 deletions client/index/disk_ann.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewDiskANNIndex(metricType MetricType) Index {
return &diskANNIndex{
baseIndex: baseIndex{
metricType: metricType,
indexType: DISKANN,
},
}
}
2 changes: 2 additions & 0 deletions client/index/flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewFlatIndex(metricType MetricType) Index {
return flatIndex{
baseIndex: baseIndex{
metricType: metricType,
indexType: Flat,
},
}
}
Expand All @@ -54,6 +55,7 @@ func NewBinFlatIndex(metricType MetricType) Index {
return binFlatIndex{
baseIndex: baseIndex{
metricType: metricType,
indexType: BinFlat,
},
}
}
2 changes: 2 additions & 0 deletions cmd/components/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand All @@ -50,6 +51,7 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
}

func (n *Proxy) Prepare() error {
indexparamcheck.ValidateParamTable()
return n.svr.Prepare()
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/bytedance/sonic v1.12.2
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cockroachdb/redact v1.1.3
github.com/goccy/go-json v0.10.3
github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jolestar/go-commons-pool/v2 v2.1.2
Expand Down Expand Up @@ -131,7 +132,6 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down
20 changes: 12 additions & 8 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,9 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
while (data.arrow_reader_channel->pop(r)) {
auto chunk =
create_chunk(field_meta,
IsVectorDataType(field_meta.get_data_type())
IsVectorDataType(field_meta.get_data_type()) &&
!IsSparseFloatVectorDataType(
field_meta.get_data_type())
? field_meta.get_dim()
: 1,
r->reader);
Expand Down Expand Up @@ -549,13 +551,15 @@ ChunkedSegmentSealedImpl::MapFieldData(const FieldId field_id,
// indices,
// element_indices,
// valid_data);
auto chunk = create_chunk(field_meta,
IsVectorDataType(field_meta.get_data_type())
? field_meta.get_dim()
: 1,
file,
file_offset,
r->reader);
auto chunk = create_chunk(
field_meta,
IsVectorDataType(field_meta.get_data_type()) &&
!IsSparseFloatVectorDataType(field_meta.get_data_type())
? field_meta.get_dim()
: 1,
file,
file_offset,
r->reader);
file_offset += chunk->Size();
chunks.push_back(chunk);
}
Expand Down
73 changes: 73 additions & 0 deletions internal/core/src/segcore/vector_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,79 @@
#include "index/IndexFactory.h"
#include "pb/index_cgo_msg.pb.h"

CStatus
ValidateIndexParams(const char* index_type,
enum CDataType data_type,
const uint8_t* serialized_index_params,
const uint64_t length) {
try {
auto index_params =
std::make_unique<milvus::proto::indexcgo::IndexParams>();
auto res =
index_params->ParseFromArray(serialized_index_params, length);
AssertInfo(res, "Unmarshall index params failed");

knowhere::Json json;

for (size_t i = 0; i < index_params->params_size(); i++) {
auto& param = index_params->params(i);
json[param.key()] = param.value();
}

milvus::DataType dataType(static_cast<milvus::DataType>(data_type));

knowhere::Status status;
std::string error_msg;
if (dataType == milvus::DataType::VECTOR_BINARY) {
status = knowhere::IndexStaticFaced<knowhere::bin1>::ConfigCheck(
index_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
json,
error_msg);
} else if (dataType == milvus::DataType::VECTOR_FLOAT) {
status = knowhere::IndexStaticFaced<knowhere::fp32>::ConfigCheck(
index_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
json,
error_msg);
} else if (dataType == milvus::DataType::VECTOR_BFLOAT16) {
status = knowhere::IndexStaticFaced<knowhere::bf16>::ConfigCheck(
index_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
json,
error_msg);
} else if (dataType == milvus::DataType::VECTOR_FLOAT16) {
status = knowhere::IndexStaticFaced<knowhere::fp16>::ConfigCheck(
index_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
json,
error_msg);
} else if (dataType == milvus::DataType::VECTOR_SPARSE_FLOAT) {
status = knowhere::IndexStaticFaced<knowhere::fp32>::ConfigCheck(
index_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
json,
error_msg);
} else {
status = knowhere::Status::invalid_args;
}
CStatus cStatus;
if (status == knowhere::Status::success) {
cStatus.error_code = milvus::Success;
cStatus.error_msg = "";
} else {
cStatus.error_code = milvus::ConfigInvalid;
cStatus.error_msg = strdup(error_msg.c_str());
}
return cStatus;
} catch (std::exception& e) {
auto cStatus = CStatus();
cStatus.error_code = milvus::UnexpectedError;
cStatus.error_msg = strdup(e.what());
return cStatus;
}
}

int
GetIndexListSize() {
return knowhere::IndexFactory::Instance().GetIndexFeatures().size();
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/segcore/vector_index_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ extern "C" {
#include <stdbool.h>
#include "common/type_c.h"

CStatus
ValidateIndexParams(const char* index_type,
enum CDataType data_type,
const uint8_t* index_params,
const uint64_t length);

int
GetIndexListSize();

Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_l0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *L0CompactionTaskSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
s.mockAlloc = allocator.NewMockAllocator(s.T())
//s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
// s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
}

func (s *L0CompactionTaskSuite) SetupSubTest() {
Expand Down
41 changes: 38 additions & 3 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,43 @@ package datacoord

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/hashicorp/golang-lru/v2/expirable"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.CompactionTask {
return &metricsinfo.CompactionTask{
PlanID: task.PlanID,
CollectionID: task.CollectionID,
Type: task.Type.String(),
State: task.State.String(),
FailReason: task.FailReason,
StartTime: task.StartTime,
EndTime: task.EndTime,
TotalRows: task.TotalRows,
InputSegments: task.InputSegments,
ResultSegments: task.ResultSegments,
}
}

type compactionTaskMeta struct {
sync.RWMutex
ctx context.Context
catalog metastore.DataCoordCatalog
// currently only clustering compaction task is stored in persist meta
compactionTasks map[int64]map[int64]*datapb.CompactionTask // triggerID -> planID
taskStats *expirable.LRU[UniqueID, *metricsinfo.CompactionTask]
}

func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*compactionTaskMeta, error) {
Expand All @@ -43,6 +63,7 @@ func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatal
ctx: ctx,
catalog: catalog,
compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0),
taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](1024, nil, time.Minute*60),
}
if err := csm.reloadFromKV(); err != nil {
return nil, err
Expand Down Expand Up @@ -125,16 +146,17 @@ func (csm *compactionTaskMeta) SaveCompactionTask(task *datapb.CompactionTask) e
log.Error("meta update: update compaction task fail", zap.Error(err))
return err
}
return csm.saveCompactionTaskMemory(task)
csm.saveCompactionTaskMemory(task)
return nil
}

func (csm *compactionTaskMeta) saveCompactionTaskMemory(task *datapb.CompactionTask) error {
func (csm *compactionTaskMeta) saveCompactionTaskMemory(task *datapb.CompactionTask) {
_, triggerIDExist := csm.compactionTasks[task.TriggerID]
if !triggerIDExist {
csm.compactionTasks[task.TriggerID] = make(map[int64]*datapb.CompactionTask, 0)
}
csm.compactionTasks[task.TriggerID][task.PlanID] = task
return nil
csm.taskStats.Add(task.PlanID, newCompactionTaskStats(task))
}

func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) error {
Expand All @@ -153,3 +175,16 @@ func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) e
}
return nil
}

func (csm *compactionTaskMeta) TaskStatsJSON() string {
tasks := csm.taskStats.Values()
if len(tasks) == 0 {
return ""
}

ret, err := json.Marshal(tasks)
if err != nil {
return ""
}
return string(ret)
}
49 changes: 49 additions & 0 deletions internal/datacoord/compaction_task_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package datacoord

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
)

func TestCompactionTaskMetaSuite(t *testing.T) {
Expand Down Expand Up @@ -79,3 +82,49 @@ func (suite *CompactionTaskMetaSuite) TestGetCompactionTasksByCollectionAbnormal
res := suite.meta.GetCompactionTasksByCollection(101)
suite.Equal(1, len(res))
}

func (suite *CompactionTaskMetaSuite) TestTaskStatsJSON() {
task1 := &datapb.CompactionTask{
PlanID: 1,
CollectionID: 100,
Type: datapb.CompactionType_MergeCompaction,
State: datapb.CompactionTaskState_completed,
FailReason: "",
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(time.Hour).Unix(),
TotalRows: 1000,
InputSegments: []int64{1, 2},
ResultSegments: []int64{3},
}
task2 := &datapb.CompactionTask{
PlanID: 2,
CollectionID: 101,
Type: datapb.CompactionType_MergeCompaction,
State: datapb.CompactionTaskState_completed,
FailReason: "",
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(time.Hour).Unix(),
TotalRows: 2000,
InputSegments: []int64{4, 5},
ResultSegments: []int64{6},
}

// testing return empty string
actualJSON := suite.meta.TaskStatsJSON()
suite.Equal("", actualJSON)

err := suite.meta.SaveCompactionTask(task1)
suite.NoError(err)
err = suite.meta.SaveCompactionTask(task2)
suite.NoError(err)

expectedTasks := []*metricsinfo.CompactionTask{
newCompactionTaskStats(task1),
newCompactionTaskStats(task2),
}
expectedJSON, err := json.Marshal(expectedTasks)
suite.NoError(err)

actualJSON = suite.meta.TaskStatsJSON()
suite.JSONEq(string(expectedJSON), actualJSON)
}
Loading

0 comments on commit 5b85a19

Please sign in to comment.