Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test01 #68

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ pull_request_rules:
add:
- kind/feature


- name: Label enhancement PRs
conditions:
# branch condition: in this pull request, the changes are based on any branch referenced by BRANCHES
Expand Down
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,9 @@ dataCoord:
# level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.
# mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions.
taskPrioritizer: default
taskQueueCapacity: 256 # compaction task queue size
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
gcInterval: 1800 # The time interval in seconds for compaction gc
clustering:
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/common/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class Chunk {
return data_;
}

const char*
RawData() const {
return data_;
}

virtual bool
isValid(int offset) {
return valid_[offset];
Expand Down
7 changes: 4 additions & 3 deletions internal/core/src/exec/expression/CompareExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ PhyCompareFilterExpr::ExecCompareExprDispatcher(OpType op) {
right_current_chunk_id_,
right_current_chunk_pos_);
for (int i = 0; i < real_batch_size; ++i) {
if (!left().has_value() || !right().has_value()) {
auto left_value = left(), right_value = right();
if (!left_value.has_value() || !right_value.has_value()) {
res[i] = false;
valid_res[i] = false;
continue;
}
res[i] =
boost::apply_visitor(milvus::query::Relational<decltype(op)>{},
left().value(),
right().value());
left_value.value(),
right_value.value());
}
return res_vec;
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/mmap/ChunkedColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ChunkedColumnBase : public ColumnBase {
AssertInfo(chunks_.size() == 1,
"only support one chunk, but got {} chunk(s)",
chunks_.size());
return chunks_[0]->Data();
return chunks_[0]->RawData();
}

bool
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/segcore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,11 @@

add_source_at_current_directory_recursively()
add_library(milvus_segcore OBJECT ${SOURCE_FILES})

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CHECK_SORTED ON)
else()
set(CHECK_SORTED OFF)
endif()

add_definitions(-DCHECK_SORTED=${CHECK_SORTED})
33 changes: 8 additions & 25 deletions internal/core/src/segcore/SegmentChunkReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,13 @@ SegmentChunkReader::GetChunkDataAccessor(FieldId field_id,
if (index) {
auto& indexing = const_cast<index::ScalarIndex<T>&>(
segment_->chunk_scalar_index<T>(field_id, current_chunk_id));
auto current_chunk_size = segment_->type() == SegmentType::Growing
? SizePerChunk()
: active_count_;

if (indexing.HasRawData()) {
return [&, current_chunk_size]() -> const data_access_type {
if (current_chunk_pos >= current_chunk_size) {
current_chunk_id++;
current_chunk_pos = 0;
indexing = const_cast<index::ScalarIndex<T>&>(
segment_->chunk_scalar_index<T>(field_id,
current_chunk_id));
return [&]() -> const data_access_type {
if (current_chunk_pos >= active_count_) {
return std::nullopt;
}
auto raw = indexing.Reverse_Lookup(current_chunk_pos);
current_chunk_pos++;
auto raw = indexing.Reverse_Lookup(current_chunk_pos++);
if (!raw.has_value()) {
return std::nullopt;
}
Expand Down Expand Up @@ -85,21 +77,12 @@ SegmentChunkReader::GetChunkDataAccessor<std::string>(
auto& indexing = const_cast<index::ScalarIndex<std::string>&>(
segment_->chunk_scalar_index<std::string>(field_id,
current_chunk_id));
auto current_chunk_size = segment_->type() == SegmentType::Growing
? SizePerChunk()
: active_count_;

if (indexing.HasRawData()) {
return [&, current_chunk_size]() mutable -> const data_access_type {
if (current_chunk_pos >= current_chunk_size) {
current_chunk_id++;
current_chunk_pos = 0;
indexing = const_cast<index::ScalarIndex<std::string>&>(
segment_->chunk_scalar_index<std::string>(
field_id, current_chunk_id));
return [&]() mutable -> const data_access_type {
if (current_chunk_pos >= active_count_) {
return std::nullopt;
}
auto raw = indexing.Reverse_Lookup(current_chunk_pos);
current_chunk_pos++;
auto raw = indexing.Reverse_Lookup(current_chunk_pos++);
if (!raw.has_value()) {
return std::nullopt;
}
Expand Down
137 changes: 54 additions & 83 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,41 @@

namespace milvus::segcore {

#ifdef CHECK_SORTED
#define ASSERT_COLUMN_ORDERED(data_type, column) \
{ \
switch (data_type) { \
case DataType::INT64: { \
auto col = \
std::dynamic_pointer_cast<SingleChunkColumn>(column); \
auto pks = reinterpret_cast<const int64_t*>(col->Data()); \
for (int i = 1; i < col->NumRows(); ++i) { \
assert(pks[i - 1] <= pks[i] && \
"INT64 Column is not ordered!"); \
} \
break; \
} \
case DataType::VARCHAR: { \
auto col = std::dynamic_pointer_cast< \
SingleChunkVariableColumn<std::string>>(column); \
auto pks = col->Views(); \
for (int i = 1; i < col->NumRows(); ++i) { \
assert(pks[i - 1] <= pks[i] && \
"VARCHAR Column is not ordered!"); \
} \
break; \
} \
default: { \
PanicInfo(DataTypeInvalid, \
fmt::format("unsupported primary key data type", \
data_type)); \
} \
} \
}
#else
#define ASSERT_COLUMN_ORDERED(data_type, column) ((void)0)
#endif

static inline void
set_bit(BitsetType& bitset, FieldId field_id, bool flag = true) {
auto pos = field_id.get() - START_USER_FIELDID;
Expand Down Expand Up @@ -458,11 +493,15 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
// set pks to offset
// if the segments are already sorted by pk, there is no need to build a pk offset index.
// it can directly perform a binary search on the pk column.
if (schema_->get_primary_field_id() == field_id && !is_sorted_by_pk_) {
AssertInfo(field_id.get() != -1, "Primary key is -1");
AssertInfo(insert_record_.empty_pks(), "already exists");
insert_record_.insert_pks(data_type, column);
insert_record_.seal_pks();
if (schema_->get_primary_field_id() == field_id) {
if (!is_sorted_by_pk_) {
AssertInfo(field_id.get() != -1, "Primary key is -1");
AssertInfo(insert_record_.empty_pks(), "already exists");
insert_record_.insert_pks(data_type, column);
insert_record_.seal_pks();
} else {
ASSERT_COLUMN_ORDERED(data_type, column);
}
}

bool use_temp_index = false;
Expand Down Expand Up @@ -889,74 +928,6 @@ SegmentSealedImpl::search_pk(const PkType& pk, int64_t insert_barrier) const {
return pk_offsets;
}

std::shared_ptr<DeletedRecord::TmpBitmap>
SegmentSealedImpl::get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}

auto bitmap = current->bitmap_ptr;

int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}

// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];

delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record_.timestamps_[offset.get()] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}

delete_record.insert_lru_entry(current);
return current;
}

void
SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,
int64_t ins_barrier,
Expand All @@ -968,16 +939,16 @@ SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,

auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();

if (!is_sorted_by_pk_) {
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp);
} else {
bitmap_holder = get_deleted_bitmap_s(
del_barrier, ins_barrier, deleted_record_, timestamp);
}
auto search_fn = [this](const PkType& pk, int64_t barrier) {
return this->search_pk(pk, barrier);
};
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp,
is_sorted_by_pk_,
search_fn);

if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
Expand Down
6 changes: 0 additions & 6 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ class SegmentSealedImpl : public SegmentSealed {
std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const;

std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const;

std::unique_ptr<DataArray>
get_vector(FieldId field_id,
const int64_t* ids,
Expand Down
18 changes: 12 additions & 6 deletions internal/core/src/segcore/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,15 @@ MergeDataArray(std::vector<MergeBase>& merge_bases,

template <bool is_sealed>
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp) {
get_deleted_bitmap(
int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp,
bool is_sorted_by_pk = false,
const std::function<std::vector<SegOffset>(const PkType&, int64_t)>&
search_fn = nullptr) {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
Expand Down Expand Up @@ -153,7 +157,9 @@ get_deleted_bitmap(int64_t del_barrier,
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
auto segOffsets = is_sorted_by_pk
? search_fn(pk, insert_barrier)
: insert_record.search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

Expand Down
9 changes: 8 additions & 1 deletion internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// TODO: we just warn about the long executing/queuing tasks
// need to get rid of long queuing tasks because the compaction tasks are local optimum.
var maxCompactionTaskExecutionDuration = map[datapb.CompactionType]time.Duration{
datapb.CompactionType_MixCompaction: 30 * time.Minute,
datapb.CompactionType_Level0DeleteCompaction: 30 * time.Minute,
Expand Down Expand Up @@ -180,8 +183,11 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta,
allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler,
) *compactionPlanHandler {
// Higher capacity will have better ordering in priority, but consumes more memory.
// TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of.
capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt()
return &compactionPlanHandler{
queueTasks: *NewCompactionQueue(256, getPrioritizer()), // Higher capacity will have better ordering in priority, but consumes more memory.
queueTasks: *NewCompactionQueue(capacity, getPrioritizer()),
chManager: cm,
meta: meta,
sessions: sessions,
Expand Down Expand Up @@ -293,6 +299,7 @@ func (c *compactionPlanHandler) loadMeta() {
state := task.GetState()
if state == datapb.CompactionTaskState_completed ||
state == datapb.CompactionTaskState_cleaned ||
state == datapb.CompactionTaskState_timeout ||
state == datapb.CompactionTaskState_unknown {
log.Info("compactionPlanHandler loadMeta abandon compactionTask",
zap.Int64("planID", task.GetPlanID()),
Expand Down
Loading
Loading