diff --git a/.github/mergify.yml b/.github/mergify.yml index 0317162b4d121..c2258f6e5c126 100644 --- a/.github/mergify.yml +++ b/.github/mergify.yml @@ -502,6 +502,7 @@ pull_request_rules: add: - kind/feature + - name: Label enhancement PRs conditions: # branch condition: in this pull request, the changes are based on any branch referenced by BRANCHES diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9d6de7e00cb2e..ff35477d9b466 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -559,9 +559,9 @@ dataCoord: # level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions. # mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions. taskPrioritizer: default + taskQueueCapacity: 256 # compaction task queue size rpcTimeout: 10 maxParallelTaskNum: 10 - workerMaxParallelTaskNum: 2 dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) gcInterval: 1800 # The time interval in seconds for compaction gc clustering: diff --git a/internal/core/src/common/Chunk.h b/internal/core/src/common/Chunk.h index 24db41dcd0198..5f9a40c0e43d5 100644 --- a/internal/core/src/common/Chunk.h +++ b/internal/core/src/common/Chunk.h @@ -66,6 +66,11 @@ class Chunk { return data_; } + const char* + RawData() const { + return data_; + } + virtual bool isValid(int offset) { return valid_[offset]; diff --git a/internal/core/src/exec/expression/CompareExpr.cpp b/internal/core/src/exec/expression/CompareExpr.cpp index 8916d366c2846..7044f5917f11f 100644 --- a/internal/core/src/exec/expression/CompareExpr.cpp +++ b/internal/core/src/exec/expression/CompareExpr.cpp @@ -64,15 +64,16 @@ PhyCompareFilterExpr::ExecCompareExprDispatcher(OpType op) { right_current_chunk_id_, right_current_chunk_pos_); for (int i = 0; i < real_batch_size; ++i) { - if (!left().has_value() || !right().has_value()) { + auto left_value = left(), right_value = right(); + if (!left_value.has_value() || !right_value.has_value()) { res[i] = false; valid_res[i] = false; continue; } res[i] = boost::apply_visitor(milvus::query::Relational{}, - left().value(), - right().value()); + left_value.value(), + right_value.value()); } return res_vec; } else { diff --git a/internal/core/src/mmap/ChunkedColumn.h b/internal/core/src/mmap/ChunkedColumn.h index a051d31f774d2..09dbd2597cbb4 100644 --- a/internal/core/src/mmap/ChunkedColumn.h +++ b/internal/core/src/mmap/ChunkedColumn.h @@ -83,7 +83,7 @@ class ChunkedColumnBase : public ColumnBase { AssertInfo(chunks_.size() == 1, "only support one chunk, but got {} chunk(s)", chunks_.size()); - return chunks_[0]->Data(); + return chunks_[0]->RawData(); } bool diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index 63eec8e63d5c0..e2674454ac4d0 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -12,3 +12,11 @@ add_source_at_current_directory_recursively() add_library(milvus_segcore OBJECT ${SOURCE_FILES}) + +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + set(CHECK_SORTED ON) +else() + set(CHECK_SORTED OFF) +endif() + +add_definitions(-DCHECK_SORTED=${CHECK_SORTED}) \ No newline at end of file diff --git a/internal/core/src/segcore/SegmentChunkReader.cpp b/internal/core/src/segcore/SegmentChunkReader.cpp index 432f0832c84a3..919464520564d 100644 --- a/internal/core/src/segcore/SegmentChunkReader.cpp +++ b/internal/core/src/segcore/SegmentChunkReader.cpp @@ -25,21 +25,13 @@ SegmentChunkReader::GetChunkDataAccessor(FieldId field_id, if (index) { auto& indexing = const_cast&>( segment_->chunk_scalar_index(field_id, current_chunk_id)); - auto current_chunk_size = segment_->type() == SegmentType::Growing - ? SizePerChunk() - : active_count_; if (indexing.HasRawData()) { - return [&, current_chunk_size]() -> const data_access_type { - if (current_chunk_pos >= current_chunk_size) { - current_chunk_id++; - current_chunk_pos = 0; - indexing = const_cast&>( - segment_->chunk_scalar_index(field_id, - current_chunk_id)); + return [&]() -> const data_access_type { + if (current_chunk_pos >= active_count_) { + return std::nullopt; } - auto raw = indexing.Reverse_Lookup(current_chunk_pos); - current_chunk_pos++; + auto raw = indexing.Reverse_Lookup(current_chunk_pos++); if (!raw.has_value()) { return std::nullopt; } @@ -85,21 +77,12 @@ SegmentChunkReader::GetChunkDataAccessor( auto& indexing = const_cast&>( segment_->chunk_scalar_index(field_id, current_chunk_id)); - auto current_chunk_size = segment_->type() == SegmentType::Growing - ? SizePerChunk() - : active_count_; - if (indexing.HasRawData()) { - return [&, current_chunk_size]() mutable -> const data_access_type { - if (current_chunk_pos >= current_chunk_size) { - current_chunk_id++; - current_chunk_pos = 0; - indexing = const_cast&>( - segment_->chunk_scalar_index( - field_id, current_chunk_id)); + return [&]() mutable -> const data_access_type { + if (current_chunk_pos >= active_count_) { + return std::nullopt; } - auto raw = indexing.Reverse_Lookup(current_chunk_pos); - current_chunk_pos++; + auto raw = indexing.Reverse_Lookup(current_chunk_pos++); if (!raw.has_value()) { return std::nullopt; } diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index dea574191845d..4371735fa57b5 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -54,6 +54,41 @@ namespace milvus::segcore { +#ifdef CHECK_SORTED +#define ASSERT_COLUMN_ORDERED(data_type, column) \ + { \ + switch (data_type) { \ + case DataType::INT64: { \ + auto col = \ + std::dynamic_pointer_cast(column); \ + auto pks = reinterpret_cast(col->Data()); \ + for (int i = 1; i < col->NumRows(); ++i) { \ + assert(pks[i - 1] <= pks[i] && \ + "INT64 Column is not ordered!"); \ + } \ + break; \ + } \ + case DataType::VARCHAR: { \ + auto col = std::dynamic_pointer_cast< \ + SingleChunkVariableColumn>(column); \ + auto pks = col->Views(); \ + for (int i = 1; i < col->NumRows(); ++i) { \ + assert(pks[i - 1] <= pks[i] && \ + "VARCHAR Column is not ordered!"); \ + } \ + break; \ + } \ + default: { \ + PanicInfo(DataTypeInvalid, \ + fmt::format("unsupported primary key data type", \ + data_type)); \ + } \ + } \ + } +#else +#define ASSERT_COLUMN_ORDERED(data_type, column) ((void)0) +#endif + static inline void set_bit(BitsetType& bitset, FieldId field_id, bool flag = true) { auto pos = field_id.get() - START_USER_FIELDID; @@ -458,11 +493,15 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { // set pks to offset // if the segments are already sorted by pk, there is no need to build a pk offset index. // it can directly perform a binary search on the pk column. - if (schema_->get_primary_field_id() == field_id && !is_sorted_by_pk_) { - AssertInfo(field_id.get() != -1, "Primary key is -1"); - AssertInfo(insert_record_.empty_pks(), "already exists"); - insert_record_.insert_pks(data_type, column); - insert_record_.seal_pks(); + if (schema_->get_primary_field_id() == field_id) { + if (!is_sorted_by_pk_) { + AssertInfo(field_id.get() != -1, "Primary key is -1"); + AssertInfo(insert_record_.empty_pks(), "already exists"); + insert_record_.insert_pks(data_type, column); + insert_record_.seal_pks(); + } else { + ASSERT_COLUMN_ORDERED(data_type, column); + } } bool use_temp_index = false; @@ -889,74 +928,6 @@ SegmentSealedImpl::search_pk(const PkType& pk, int64_t insert_barrier) const { return pk_offsets; } -std::shared_ptr -SegmentSealedImpl::get_deleted_bitmap_s(int64_t del_barrier, - int64_t insert_barrier, - DeletedRecord& delete_record, - Timestamp query_timestamp) const { - // if insert_barrier and del_barrier have not changed, use cache data directly - bool hit_cache = false; - int64_t old_del_barrier = 0; - auto current = delete_record.clone_lru_entry( - insert_barrier, del_barrier, old_del_barrier, hit_cache); - if (hit_cache) { - return current; - } - - auto bitmap = current->bitmap_ptr; - - int64_t start, end; - if (del_barrier < old_del_barrier) { - // in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp - // so these deletion records do not take effect in query/search - // so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0 - // for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0] - start = del_barrier; - end = old_del_barrier; - } else { - // the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier] - // for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0] - start = old_del_barrier; - end = del_barrier; - } - - // Avoid invalid calculations when there are a lot of repeated delete pks - std::unordered_map delete_timestamps; - for (auto del_index = start; del_index < end; ++del_index) { - auto pk = delete_record.pks()[del_index]; - auto timestamp = delete_record.timestamps()[del_index]; - - delete_timestamps[pk] = timestamp > delete_timestamps[pk] - ? timestamp - : delete_timestamps[pk]; - } - - for (auto& [pk, timestamp] : delete_timestamps) { - auto segOffsets = search_pk(pk, insert_barrier); - for (auto offset : segOffsets) { - int64_t insert_row_offset = offset.get(); - - // The deletion record do not take effect in search/query, - // and reset bitmap to 0 - if (timestamp > query_timestamp) { - bitmap->reset(insert_row_offset); - continue; - } - // Insert after delete with same pk, delete will not task effect on this insert record, - // and reset bitmap to 0 - if (insert_record_.timestamps_[offset.get()] >= timestamp) { - bitmap->reset(insert_row_offset); - continue; - } - // insert data corresponding to the insert_row_offset will be ignored in search/query - bitmap->set(insert_row_offset); - } - } - - delete_record.insert_lru_entry(current); - return current; -} - void SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset, int64_t ins_barrier, @@ -968,16 +939,16 @@ SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset, auto bitmap_holder = std::shared_ptr(); - if (!is_sorted_by_pk_) { - bitmap_holder = get_deleted_bitmap(del_barrier, - ins_barrier, - deleted_record_, - insert_record_, - timestamp); - } else { - bitmap_holder = get_deleted_bitmap_s( - del_barrier, ins_barrier, deleted_record_, timestamp); - } + auto search_fn = [this](const PkType& pk, int64_t barrier) { + return this->search_pk(pk, barrier); + }; + bitmap_holder = get_deleted_bitmap(del_barrier, + ins_barrier, + deleted_record_, + insert_record_, + timestamp, + is_sorted_by_pk_, + search_fn); if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { return; diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 1c07c1047a7e1..d5d633992535d 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -121,12 +121,6 @@ class SegmentSealedImpl : public SegmentSealed { std::vector search_pk(const PkType& pk, int64_t insert_barrier) const; - std::shared_ptr - get_deleted_bitmap_s(int64_t del_barrier, - int64_t insert_barrier, - DeletedRecord& delete_record, - Timestamp query_timestamp) const; - std::unique_ptr get_vector(FieldId field_id, const int64_t* ids, diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index 226e0da6441f0..ed6336aa9d4c3 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -110,11 +110,15 @@ MergeDataArray(std::vector& merge_bases, template std::shared_ptr -get_deleted_bitmap(int64_t del_barrier, - int64_t insert_barrier, - DeletedRecord& delete_record, - const InsertRecord& insert_record, - Timestamp query_timestamp) { +get_deleted_bitmap( + int64_t del_barrier, + int64_t insert_barrier, + DeletedRecord& delete_record, + const InsertRecord& insert_record, + Timestamp query_timestamp, + bool is_sorted_by_pk = false, + const std::function(const PkType&, int64_t)>& + search_fn = nullptr) { // if insert_barrier and del_barrier have not changed, use cache data directly bool hit_cache = false; int64_t old_del_barrier = 0; @@ -153,7 +157,9 @@ get_deleted_bitmap(int64_t del_barrier, } for (auto& [pk, timestamp] : delete_timestamps) { - auto segOffsets = insert_record.search_pk(pk, insert_barrier); + auto segOffsets = is_sorted_by_pk + ? search_fn(pk, insert_barrier) + : insert_record.search_pk(pk, insert_barrier); for (auto offset : segOffsets) { int64_t insert_row_offset = offset.get(); diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 05193eb31d0c1..82103062598f0 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -37,9 +37,12 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) +// TODO: we just warn about the long executing/queuing tasks +// need to get rid of long queuing tasks because the compaction tasks are local optimum. var maxCompactionTaskExecutionDuration = map[datapb.CompactionType]time.Duration{ datapb.CompactionType_MixCompaction: 30 * time.Minute, datapb.CompactionType_Level0DeleteCompaction: 30 * time.Minute, @@ -180,8 +183,11 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta, allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler, ) *compactionPlanHandler { + // Higher capacity will have better ordering in priority, but consumes more memory. + // TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of. + capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt() return &compactionPlanHandler{ - queueTasks: *NewCompactionQueue(256, getPrioritizer()), // Higher capacity will have better ordering in priority, but consumes more memory. + queueTasks: *NewCompactionQueue(capacity, getPrioritizer()), chManager: cm, meta: meta, sessions: sessions, @@ -293,6 +299,7 @@ func (c *compactionPlanHandler) loadMeta() { state := task.GetState() if state == datapb.CompactionTaskState_completed || state == datapb.CompactionTaskState_cleaned || + state == datapb.CompactionTaskState_timeout || state == datapb.CompactionTaskState_unknown { log.Info("compactionPlanHandler loadMeta abandon compactionTask", zap.Int64("planID", task.GetPlanID()), diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index c0769ff02c62b..002b8a2635f05 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -80,8 +79,6 @@ func (t *l0CompactionTask) Process() bool { return t.processPipelining() case datapb.CompactionTaskState_executing: return t.processExecuting() - case datapb.CompactionTaskState_timeout: - return t.processTimeout() case datapb.CompactionTaskState_meta_saved: return t.processMetaSaved() case datapb.CompactionTaskState_completed: @@ -133,16 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool { return false } switch result.GetState() { - case datapb.CompactionTaskState_executing: - // will L0Compaction be timeouted? - if t.checkTimeout() { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err != nil { - log.Warn("l0CompactionTask failed to set task timeout state", zap.Error(err)) - return false - } - return t.processTimeout() - } case datapb.CompactionTaskState_completed: t.result = result if err := t.saveSegmentMeta(); err != nil { @@ -190,16 +177,6 @@ func (t *l0CompactionTask) processCompleted() bool { return true } -func (t *l0CompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - if err != nil { - log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } - return true -} - func (t *l0CompactionTask) processFailed() bool { if t.hasAssignedWorker() { err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ @@ -359,24 +336,6 @@ func (t *l0CompactionTask) hasAssignedWorker() bool { return t.GetTaskProto().GetNodeID() != 0 && t.GetTaskProto().GetNodeID() != NullNodeID } -func (t *l0CompactionTask) checkTimeout() bool { - if t.GetTaskProto().GetTimeoutInSeconds() > 0 { - start := time.Unix(t.GetTaskProto().GetStartTime(), 0) - diff := time.Since(start).Seconds() - if diff > float64(t.GetTaskProto().GetTimeoutInSeconds()) { - log.Warn("compaction timeout", - zap.Int64("taskID", t.GetTaskProto().GetTriggerID()), - zap.Int64("planID", t.GetTaskProto().GetPlanID()), - zap.Int64("nodeID", t.GetTaskProto().GetNodeID()), - zap.Int32("timeout in seconds", t.GetTaskProto().GetTimeoutInSeconds()), - zap.Time("startTime", start), - ) - return true - } - } - return false -} - func (t *l0CompactionTask) SetNodeID(id UniqueID) error { return t.updateAndSaveTaskMeta(setNodeID(id)) } diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 116d1f2597a51..ede8ddd78e2cf 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "testing" - "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -405,48 +404,14 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t.updateAndSaveTaskMeta(setNodeID(100)) s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything). - Return(&datapb.CompactionPlanResult{ - PlanID: t.GetTaskProto().GetPlanID(), - State: datapb.CompactionTaskState_executing, - }, nil).Twice() - - got := t.Process() - s.False(got) - - // test timeout - t.updateAndSaveTaskMeta(setStartTime(time.Now().Add(-time.Hour).Unix()), setTimeoutInSeconds(10)) - - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false). - RunAndReturn(func(inputs []int64, compacting bool) { - s.ElementsMatch(inputs, t.GetTaskProto().GetInputSegments()) - s.False(compacting) - }).Once() - - got = t.Process() - s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) - }) - - s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() { - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() - t := s.generateTestL0Task(datapb.CompactionTaskState_executing) - t.updateAndSaveTaskMeta(setNodeID(100)) - s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything). Return(&datapb.CompactionPlanResult{ PlanID: t.GetTaskProto().GetPlanID(), State: datapb.CompactionTaskState_executing, }, nil).Once() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() - - t.updateAndSaveTaskMeta(setStartTime(time.Now().Add(-time.Hour).Unix()), setTimeoutInSeconds(10)) got := t.Process() s.False(got) - s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState()) }) s.Run("test executing with result completed", func() { @@ -545,20 +510,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState()) }) - s.Run("test timeout", func() { - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - t := s.generateTestL0Task(datapb.CompactionTaskState_timeout) - t.updateAndSaveTaskMeta(setNodeID(100)) - s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.Require().False(isCompacting) - s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments()) - }).Once() - - got := t.Process() - s.True(got) - }) - s.Run("test metaSaved success", func() { s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index c3deaacd89def..b028f7f66cc98 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -3,7 +3,6 @@ package datacoord import ( "context" "fmt" - "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -112,16 +111,6 @@ func (t *mixCompactionTask) processExecuting() bool { return false } switch result.GetState() { - case datapb.CompactionTaskState_executing: - if t.checkTimeout() { - log.Info("mixCompactionTask timeout", zap.Int32("timeout in seconds", t.GetTaskProto().GetTimeoutInSeconds()), zap.Int64("startTime", t.GetTaskProto().GetStartTime())) - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } - return t.processTimeout() - } case datapb.CompactionTaskState_completed: t.result = result if len(result.GetSegments()) == 0 { @@ -195,8 +184,6 @@ func (t *mixCompactionTask) Process() bool { processResult = t.processPipelining() case datapb.CompactionTaskState_executing: processResult = t.processExecuting() - case datapb.CompactionTaskState_timeout: - processResult = t.processTimeout() case datapb.CompactionTaskState_meta_saved: processResult = t.processMetaSaved() case datapb.CompactionTaskState_completed: @@ -250,16 +237,6 @@ func (t *mixCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.taskProto.Load().(*datapb.CompactionTask).GetInputSegments(), false) } -func (t *mixCompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } - return true -} - func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask { taskClone := proto.Clone(t.GetTaskProto()).(*datapb.CompactionTask) for _, opt := range opts { @@ -286,16 +263,6 @@ func (t *mixCompactionTask) processFailed() bool { return true } -func (t *mixCompactionTask) checkTimeout() bool { - if t.GetTaskProto().GetTimeoutInSeconds() > 0 { - diff := time.Since(time.Unix(t.GetTaskProto().GetStartTime(), 0)).Seconds() - if diff > float64(t.GetTaskProto().GetTimeoutInSeconds()) { - return true - } - } - return false -} - func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) diff --git a/internal/datacoord/compaction_task_mix_test.go b/internal/datacoord/compaction_task_mix_test.go index 00230dd658849..0a60e1a416734 100644 --- a/internal/datacoord/compaction_task_mix_test.go +++ b/internal/datacoord/compaction_task_mix_test.go @@ -2,7 +2,6 @@ package datacoord import ( "testing" - "time" "github.com/samber/lo" "github.com/stretchr/testify/mock" @@ -93,44 +92,3 @@ func (s *MixCompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() { s.ErrorIs(err, merr.ErrSegmentNotFound) }) } - -func (s *MixCompactionTaskSuite) TestCompactionTimeout() { - channel := "Ch-1" - binLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} - s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { - return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - Level: datapb.SegmentLevel_L1, - InsertChannel: channel, - State: commonpb.SegmentState_Flushed, - Binlogs: binLogs, - }} - }).Times(2) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything) - alloc := allocator.NewMockAllocator(s.T()) - alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) - task := newMixCompactionTask(&datapb.CompactionTask{ - PlanID: 1, - TriggerID: 19530, - CollectionID: 1, - PartitionID: 10, - Type: datapb.CompactionType_MixCompaction, - NodeID: 1, - State: datapb.CompactionTaskState_executing, - InputSegments: []int64{200, 201}, - ResultSegments: []int64{100, 200}, - TimeoutInSeconds: 1, - }, alloc, s.mockMeta, s.mockSessMgr) - plan, err := task.BuildCompactionRequest() - task.plan = plan - s.Require().NoError(err) - time.Sleep(time.Second * 2) - - s.mockSessMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ - State: datapb.CompactionTaskState_executing, - }, nil) - end := task.processExecuting() - s.Equal(true, end) - s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().State) -} diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index ec345b4b75e9d..b0fb0c97f0eca 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -742,8 +742,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { s.handler.checkCompaction() t := s.handler.getCompactionTask(1) - // timeout - s.Nil(t) + s.NotNil(t) t = s.handler.getCompactionTask(2) // completed diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 48563a776313f..5a625d1347450 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -643,7 +643,7 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s // to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array sort.Strings(keyGroup) removeFn := func(partialKeys []string) error { - return ss.MetaKv.MultiRemove(keyGroup) + return ss.MetaKv.MultiRemove(partialKeys) } return etcd.RemoveByBatchWithLimit(keyGroup, util.MaxEtcdTxnNum, removeFn) } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 850bbd25fdfd5..5bfcf4ab3fa45 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -564,9 +564,16 @@ func (t *hasCollectionTask) PreExecute(ctx context.Context) error { } func (t *hasCollectionTask) Execute(ctx context.Context) error { - var err error - t.result, err = t.rootCoord.HasCollection(ctx, t.HasCollectionRequest) - return merr.CheckRPCCall(t.result, err) + _, err := globalMetaCache.GetCollectionID(ctx, t.HasCollectionRequest.GetDbName(), t.HasCollectionRequest.GetCollectionName()) + t.result = &milvuspb.BoolResponse{} + // error other than + if err != nil && !errors.Is(err, merr.ErrCollectionNotFound) { + return err + } + // if collection not nil, means error is ErrCollectionNotFound, result is false + // otherwise, result is true + t.result.Value = (err == nil) + return nil } func (t *hasCollectionTask) PostExecute(ctx context.Context) error { diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 8188274259de0..b5e633cd7b01d 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -551,7 +551,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error { if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() == 0 { // first page for iteration, need to set up sessionTs for iterator - t.result.SessionTs = t.BeginTs() + t.result.SessionTs = t.GetGuaranteeTimestamp() } log.Debug("Query PostExecute done") return nil diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 86810c2abc7c6..0b15831311609 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -745,7 +745,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error { t.result.CollectionName = t.request.GetCollectionName() if t.isIterator && t.request.GetGuaranteeTimestamp() == 0 { // first page for iteration, need to set up sessionTs for iterator - t.result.SessionTs = t.BeginTs() + t.result.SessionTs = t.SearchRequest.GetGuaranteeTimestamp() } metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds())) diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index e60d046f2d50c..2eb4b219e2921 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -1081,8 +1081,23 @@ func TestHasCollectionTask(t *testing.T) { err = task.PreExecute(ctx) assert.Error(t, err) - rc.updateState(commonpb.StateCode_Abnormal) task.CollectionName = collectionName + + // invalidate collection cache, trigger rootcoord rpc + globalMetaCache.RemoveCollection(ctx, dbName, collectionName) + + // rc return collection not found error + rc.describeCollectionFunc = func(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + return nil, merr.WrapErrCollectionNotFoundWithDB(dbName, collectionName) + } + err = task.PreExecute(ctx) + assert.NoError(t, err) + err = task.Execute(ctx) + assert.NoError(t, err) + assert.False(t, task.result.GetValue()) + + // rootcoord failed to get response + rc.updateState(commonpb.StateCode_Abnormal) err = task.PreExecute(ctx) assert.NoError(t, err) err = task.Execute(ctx) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 09bd04b1bc42b..745db68618824 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -435,6 +435,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg if err != nil { return err } + sd.collection.Ref(1) sd.segmentManager.Put(ctx, segments.SegmentTypeSealed, l0Seg) return nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c8ffa4babd23c..40a3212430bfa 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3184,10 +3184,11 @@ type dataCoordConfig struct { SegmentFlushInterval ParamItem `refreshable:"true"` // compaction - EnableCompaction ParamItem `refreshable:"false"` - EnableAutoCompaction ParamItem `refreshable:"true"` - IndexBasedCompaction ParamItem `refreshable:"true"` - CompactionTaskPrioritizer ParamItem `refreshable:"true"` + EnableCompaction ParamItem `refreshable:"false"` + EnableAutoCompaction ParamItem `refreshable:"true"` + IndexBasedCompaction ParamItem `refreshable:"true"` + CompactionTaskPrioritizer ParamItem `refreshable:"true"` + CompactionTaskQueueCapacity ParamItem `refreshable:"false"` CompactionRPCTimeout ParamItem `refreshable:"true"` CompactionMaxParallelTasks ParamItem `refreshable:"true"` @@ -3474,6 +3475,15 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl } p.CompactionTaskPrioritizer.Init(base.mgr) + p.CompactionTaskQueueCapacity = ParamItem{ + Key: "dataCoord.compaction.taskQueueCapacity", + Version: "2.5.0", + DefaultValue: "256", + Doc: `compaction task queue size`, + Export: true, + } + p.CompactionTaskQueueCapacity.Init(base.mgr) + p.CompactionRPCTimeout = ParamItem{ Key: "dataCoord.compaction.rpcTimeout", Version: "2.2.12", @@ -3490,14 +3500,6 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl } p.CompactionMaxParallelTasks.Init(base.mgr) - p.CompactionWorkerParalleTasks = ParamItem{ - Key: "dataCoord.compaction.workerMaxParallelTaskNum", - Version: "2.3.0", - DefaultValue: "2", - Export: true, - } - p.CompactionWorkerParalleTasks.Init(base.mgr) - p.MinSegmentToMerge = ParamItem{ Key: "dataCoord.compaction.min.segment", Version: "2.0.0",