diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 0e9caa580618e..7ae2b594aff08 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -301,7 +301,7 @@ func (t *clusteringCompactionTask) processStats() error { if !ok { return nil } - resultSegments = append(resultSegments, to.GetID()) + resultSegments = append(resultSegments, lo.Map(to, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...) } log.Info("clustering compaction stats task finished", diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index e0bedad961fde..ce11d49486daa 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -556,7 +556,7 @@ func TestServer_AlterIndex(t *testing.T) { catalog: catalog, indexMeta: indexMeta, segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), + compactionTo: make(map[int64][]int64), segments: map[UniqueID]*SegmentInfo{ invalidSegID: { SegmentInfo: &datapb.SegmentInfo{ diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index e9b18e673327b..b113fb82bfdf1 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1651,7 +1651,7 @@ func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) { } // GetCompactionTo returns the segment info of the segment to be compacted to. -func (m *meta) GetCompactionTo(segmentID int64) (*SegmentInfo, bool) { +func (m *meta) GetCompactionTo(segmentID int64) ([]*SegmentInfo, bool) { m.RLock() defer m.RUnlock() diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 78bb0078ac9bb..e89221eb184d0 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -861,7 +861,7 @@ func Test_meta_SetSegmentsCompacting(t *testing.T) { isCompacting: false, }, }, - compactionTo: make(map[int64]UniqueID), + compactionTo: make(map[int64][]UniqueID), }, }, args{ diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 5cfaee5a160bb..e9a20e341517b 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -35,8 +35,9 @@ import ( type SegmentsInfo struct { segments map[UniqueID]*SegmentInfo secondaryIndexes segmentInfoIndexes - compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key. - // A segment can be compacted to only one segment finally in meta. + // map the compact relation, value is the segment which `CompactFrom` contains key. + // now segment could be compacted to multiple segments + compactionTo map[UniqueID][]UniqueID } type segmentInfoIndexes struct { @@ -87,7 +88,7 @@ func NewSegmentsInfo() *SegmentsInfo { coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo), channel2Segments: make(map[string]map[UniqueID]*SegmentInfo), }, - compactionTo: make(map[UniqueID]UniqueID), + compactionTo: make(map[UniqueID][]UniqueID), } } @@ -167,15 +168,21 @@ func (s *SegmentsInfo) GetRealSegmentsForChannel(channel string) []*SegmentInfo // Return (nil, false) if given segmentID can not found in the meta. // Return (nil, true) if given segmentID can be found not no compaction to. // Return (notnil, true) if given segmentID can be found and has compaction to. -func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool) { +func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) ([]*SegmentInfo, bool) { if _, ok := s.segments[fromSegmentID]; !ok { return nil, false } - if toID, ok := s.compactionTo[fromSegmentID]; ok { - if to, ok := s.segments[toID]; ok { - return to, true + if compactTos, ok := s.compactionTo[fromSegmentID]; ok { + result := []*SegmentInfo{} + for _, compactTo := range compactTos { + to, ok := s.segments[compactTo] + if !ok { + log.Warn("compactionTo relation is broken", zap.Int64("from", fromSegmentID), zap.Int64("to", compactTo)) + return nil, true + } + result = append(result, to) } - log.Warn("unreachable code: compactionTo relation is broken", zap.Int64("from", fromSegmentID), zap.Int64("to", toID)) + return result, true } return nil, true } @@ -380,7 +387,7 @@ func (s *SegmentsInfo) removeSecondaryIndex(segment *SegmentInfo) { // addCompactTo adds the compact relation to the segment func (s *SegmentsInfo) addCompactTo(segment *SegmentInfo) { for _, from := range segment.GetCompactionFrom() { - s.compactionTo[from] = segment.GetID() + s.compactionTo[from] = append(s.compactionTo[from], segment.GetID()) } } diff --git a/internal/datacoord/segment_info_test.go b/internal/datacoord/segment_info_test.go index 95e9680b78dbb..51f8b19356008 100644 --- a/internal/datacoord/segment_info_test.go +++ b/internal/datacoord/segment_info_test.go @@ -3,98 +3,106 @@ package datacoord import ( "testing" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/internal/proto/datapb" ) func TestCompactionTo(t *testing.T) { - segments := NewSegmentsInfo() - segment := NewSegmentInfo(&datapb.SegmentInfo{ - ID: 1, + t.Run("mix_2_to_1", func(t *testing.T) { + segments := NewSegmentsInfo() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: 1, + }) + segments.SetSegment(segment.GetID(), segment) + + compactTos, ok := segments.GetCompactionTo(1) + assert.True(t, ok) + assert.Nil(t, compactTos) + + segment = NewSegmentInfo(&datapb.SegmentInfo{ + ID: 2, + }) + segments.SetSegment(segment.GetID(), segment) + segment = NewSegmentInfo(&datapb.SegmentInfo{ + ID: 3, + CompactionFrom: []int64{1, 2}, + }) + segments.SetSegment(segment.GetID(), segment) + + getCompactToIDs := func(segments []*SegmentInfo) []int64 { + return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() }) + } + + compactTos, ok = segments.GetCompactionTo(3) + assert.Nil(t, compactTos) + assert.True(t, ok) + compactTos, ok = segments.GetCompactionTo(1) + assert.True(t, ok) + assert.NotNil(t, compactTos) + assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos)) + compactTos, ok = segments.GetCompactionTo(2) + assert.True(t, ok) + assert.NotNil(t, compactTos) + assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos)) + + // should be droped. + segments.DropSegment(1) + compactTos, ok = segments.GetCompactionTo(1) + assert.False(t, ok) + assert.Nil(t, compactTos) + compactTos, ok = segments.GetCompactionTo(2) + assert.True(t, ok) + assert.NotNil(t, compactTos) + assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos)) + compactTos, ok = segments.GetCompactionTo(3) + assert.Nil(t, compactTos) + assert.True(t, ok) + + segments.DropSegment(3) + compactTos, ok = segments.GetCompactionTo(2) + assert.True(t, ok) + assert.Nil(t, compactTos) }) - segments.SetSegment(segment.GetID(), segment) - s, ok := segments.GetCompactionTo(1) - assert.True(t, ok) - assert.Nil(t, s) - - segment = NewSegmentInfo(&datapb.SegmentInfo{ - ID: 2, - }) - segments.SetSegment(segment.GetID(), segment) - segment = NewSegmentInfo(&datapb.SegmentInfo{ - ID: 3, - CompactionFrom: []int64{1, 2}, - }) - segments.SetSegment(segment.GetID(), segment) - - s, ok = segments.GetCompactionTo(3) - assert.Nil(t, s) - assert.True(t, ok) - s, ok = segments.GetCompactionTo(1) - assert.True(t, ok) - assert.NotNil(t, s) - assert.Equal(t, int64(3), s.GetID()) - s, ok = segments.GetCompactionTo(2) - assert.True(t, ok) - assert.NotNil(t, s) - assert.Equal(t, int64(3), s.GetID()) - - // should be overwrite. - segment = NewSegmentInfo(&datapb.SegmentInfo{ - ID: 3, - CompactionFrom: []int64{2}, - }) - segments.SetSegment(segment.GetID(), segment) - - s, ok = segments.GetCompactionTo(3) - assert.True(t, ok) - assert.Nil(t, s) - s, ok = segments.GetCompactionTo(1) - assert.True(t, ok) - assert.Nil(t, s) - s, ok = segments.GetCompactionTo(2) - assert.True(t, ok) - assert.NotNil(t, s) - assert.Equal(t, int64(3), s.GetID()) - - // should be overwrite back. - segment = NewSegmentInfo(&datapb.SegmentInfo{ - ID: 3, - CompactionFrom: []int64{1, 2}, + t.Run("split_1_to_2", func(t *testing.T) { + segments := NewSegmentsInfo() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: 1, + }) + segments.SetSegment(segment.GetID(), segment) + + compactTos, ok := segments.GetCompactionTo(1) + assert.True(t, ok) + assert.Nil(t, compactTos) + + segment = NewSegmentInfo(&datapb.SegmentInfo{ + ID: 2, + CompactionFrom: []int64{1}, + }) + segments.SetSegment(segment.GetID(), segment) + segment = NewSegmentInfo(&datapb.SegmentInfo{ + ID: 3, + CompactionFrom: []int64{1}, + }) + segments.SetSegment(segment.GetID(), segment) + + getCompactToIDs := func(segments []*SegmentInfo) []int64 { + return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() }) + } + + compactTos, ok = segments.GetCompactionTo(2) + assert.Nil(t, compactTos) + assert.True(t, ok) + compactTos, ok = segments.GetCompactionTo(3) + assert.Nil(t, compactTos) + assert.True(t, ok) + compactTos, ok = segments.GetCompactionTo(1) + assert.True(t, ok) + assert.NotNil(t, compactTos) + assert.ElementsMatch(t, []int64{2, 3}, getCompactToIDs(compactTos)) }) - segments.SetSegment(segment.GetID(), segment) - - s, ok = segments.GetCompactionTo(3) - assert.Nil(t, s) - assert.True(t, ok) - s, ok = segments.GetCompactionTo(1) - assert.True(t, ok) - assert.NotNil(t, s) - assert.Equal(t, int64(3), s.GetID()) - s, ok = segments.GetCompactionTo(2) - assert.True(t, ok) - assert.NotNil(t, s) - assert.Equal(t, int64(3), s.GetID()) - - // should be droped. - segments.DropSegment(1) - s, ok = segments.GetCompactionTo(1) - assert.False(t, ok) - assert.Nil(t, s) - s, ok = segments.GetCompactionTo(2) - assert.True(t, ok) - assert.NotNil(t, s) - assert.Equal(t, int64(3), s.GetID()) - s, ok = segments.GetCompactionTo(3) - assert.Nil(t, s) - assert.True(t, ok) - - segments.DropSegment(3) - s, ok = segments.GetCompactionTo(2) - assert.True(t, ok) - assert.Nil(t, s) } func TestGetSegmentSize(t *testing.T) { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 779b77c4e14e1..da0bdaca1b7cb 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -428,7 +428,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR info = s.meta.GetSegment(id) // TODO: GetCompactionTo should be removed and add into GetSegment method and protected by lock. // Too much modification need to be applied to SegmentInfo, a refactor is needed. - child, ok := s.meta.GetCompactionTo(id) + children, ok := s.meta.GetCompactionTo(id) // info may be not-nil, but ok is false when the segment is being dropped concurrently. if info == nil || !ok { @@ -439,7 +439,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR } clonedInfo := info.Clone() - if child != nil { + for _, child := range children { clonedChild := child.Clone() // child segment should decompress binlog path binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs()) diff --git a/internal/datacoord/task_stats_test.go b/internal/datacoord/task_stats_test.go index 26dbe53b04c56..871ea5740ba40 100644 --- a/internal/datacoord/task_stats_test.go +++ b/internal/datacoord/task_stats_test.go @@ -102,7 +102,7 @@ func (s *statsTaskSuite) SetupSuite() { }, }, }, - compactionTo: map[UniqueID]UniqueID{}, + compactionTo: map[UniqueID][]UniqueID{}, }, statsTaskMeta: &statsTaskMeta{