Skip to content

Commit

Permalink
fix: Return all compactTo segments after support split (milvus-io#36361)
Browse files Browse the repository at this point in the history
Related to milvus-io#36360

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Sep 20, 2024
1 parent 3b10085 commit d2c774f
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 99 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (t *clusteringCompactionTask) processStats() error {
if !ok {
return nil
}
resultSegments = append(resultSegments, to.GetID())
resultSegments = append(resultSegments, lo.Map(to, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)
}

log.Info("clustering compaction stats task finished",
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func TestServer_AlterIndex(t *testing.T) {
catalog: catalog,
indexMeta: indexMeta,
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
compactionTo: make(map[int64][]int64),
segments: map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,7 @@ func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) {
}

// GetCompactionTo returns the segment info of the segment to be compacted to.
func (m *meta) GetCompactionTo(segmentID int64) (*SegmentInfo, bool) {
func (m *meta) GetCompactionTo(segmentID int64) ([]*SegmentInfo, bool) {
m.RLock()
defer m.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func Test_meta_SetSegmentsCompacting(t *testing.T) {
isCompacting: false,
},
},
compactionTo: make(map[int64]UniqueID),
compactionTo: make(map[int64][]UniqueID),
},
},
args{
Expand Down
25 changes: 16 additions & 9 deletions internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
type SegmentsInfo struct {
segments map[UniqueID]*SegmentInfo
secondaryIndexes segmentInfoIndexes
compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key.
// A segment can be compacted to only one segment finally in meta.
// map the compact relation, value is the segment which `CompactFrom` contains key.
// now segment could be compacted to multiple segments
compactionTo map[UniqueID][]UniqueID
}

type segmentInfoIndexes struct {
Expand Down Expand Up @@ -87,7 +88,7 @@ func NewSegmentsInfo() *SegmentsInfo {
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
channel2Segments: make(map[string]map[UniqueID]*SegmentInfo),
},
compactionTo: make(map[UniqueID]UniqueID),
compactionTo: make(map[UniqueID][]UniqueID),
}
}

Expand Down Expand Up @@ -167,15 +168,21 @@ func (s *SegmentsInfo) GetRealSegmentsForChannel(channel string) []*SegmentInfo
// Return (nil, false) if given segmentID can not found in the meta.
// Return (nil, true) if given segmentID can be found not no compaction to.
// Return (notnil, true) if given segmentID can be found and has compaction to.
func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool) {
func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) ([]*SegmentInfo, bool) {
if _, ok := s.segments[fromSegmentID]; !ok {
return nil, false
}
if toID, ok := s.compactionTo[fromSegmentID]; ok {
if to, ok := s.segments[toID]; ok {
return to, true
if compactTos, ok := s.compactionTo[fromSegmentID]; ok {
result := []*SegmentInfo{}
for _, compactTo := range compactTos {
to, ok := s.segments[compactTo]
if !ok {
log.Warn("compactionTo relation is broken", zap.Int64("from", fromSegmentID), zap.Int64("to", compactTo))
return nil, true
}
result = append(result, to)
}
log.Warn("unreachable code: compactionTo relation is broken", zap.Int64("from", fromSegmentID), zap.Int64("to", toID))
return result, true
}
return nil, true
}
Expand Down Expand Up @@ -380,7 +387,7 @@ func (s *SegmentsInfo) removeSecondaryIndex(segment *SegmentInfo) {
// addCompactTo adds the compact relation to the segment
func (s *SegmentsInfo) addCompactTo(segment *SegmentInfo) {
for _, from := range segment.GetCompactionFrom() {
s.compactionTo[from] = segment.GetID()
s.compactionTo[from] = append(s.compactionTo[from], segment.GetID())
}
}

Expand Down
174 changes: 91 additions & 83 deletions internal/datacoord/segment_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,98 +3,106 @@ package datacoord
import (
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/internal/proto/datapb"
)

func TestCompactionTo(t *testing.T) {
segments := NewSegmentsInfo()
segment := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
t.Run("mix_2_to_1", func(t *testing.T) {
segments := NewSegmentsInfo()
segment := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
})
segments.SetSegment(segment.GetID(), segment)

compactTos, ok := segments.GetCompactionTo(1)
assert.True(t, ok)
assert.Nil(t, compactTos)

segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 2,
})
segments.SetSegment(segment.GetID(), segment)
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
CompactionFrom: []int64{1, 2},
})
segments.SetSegment(segment.GetID(), segment)

getCompactToIDs := func(segments []*SegmentInfo) []int64 {
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
}

compactTos, ok = segments.GetCompactionTo(3)
assert.Nil(t, compactTos)
assert.True(t, ok)
compactTos, ok = segments.GetCompactionTo(1)
assert.True(t, ok)
assert.NotNil(t, compactTos)
assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos))
compactTos, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, compactTos)
assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos))

// should be droped.
segments.DropSegment(1)
compactTos, ok = segments.GetCompactionTo(1)
assert.False(t, ok)
assert.Nil(t, compactTos)
compactTos, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, compactTos)
assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos))
compactTos, ok = segments.GetCompactionTo(3)
assert.Nil(t, compactTos)
assert.True(t, ok)

segments.DropSegment(3)
compactTos, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.Nil(t, compactTos)
})
segments.SetSegment(segment.GetID(), segment)

s, ok := segments.GetCompactionTo(1)
assert.True(t, ok)
assert.Nil(t, s)

segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 2,
})
segments.SetSegment(segment.GetID(), segment)
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
CompactionFrom: []int64{1, 2},
})
segments.SetSegment(segment.GetID(), segment)

s, ok = segments.GetCompactionTo(3)
assert.Nil(t, s)
assert.True(t, ok)
s, ok = segments.GetCompactionTo(1)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())

// should be overwrite.
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
CompactionFrom: []int64{2},
})
segments.SetSegment(segment.GetID(), segment)

s, ok = segments.GetCompactionTo(3)
assert.True(t, ok)
assert.Nil(t, s)
s, ok = segments.GetCompactionTo(1)
assert.True(t, ok)
assert.Nil(t, s)
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())

// should be overwrite back.
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
CompactionFrom: []int64{1, 2},
t.Run("split_1_to_2", func(t *testing.T) {
segments := NewSegmentsInfo()
segment := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
})
segments.SetSegment(segment.GetID(), segment)

compactTos, ok := segments.GetCompactionTo(1)
assert.True(t, ok)
assert.Nil(t, compactTos)

segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 2,
CompactionFrom: []int64{1},
})
segments.SetSegment(segment.GetID(), segment)
segment = NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
CompactionFrom: []int64{1},
})
segments.SetSegment(segment.GetID(), segment)

getCompactToIDs := func(segments []*SegmentInfo) []int64 {
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
}

compactTos, ok = segments.GetCompactionTo(2)
assert.Nil(t, compactTos)
assert.True(t, ok)
compactTos, ok = segments.GetCompactionTo(3)
assert.Nil(t, compactTos)
assert.True(t, ok)
compactTos, ok = segments.GetCompactionTo(1)
assert.True(t, ok)
assert.NotNil(t, compactTos)
assert.ElementsMatch(t, []int64{2, 3}, getCompactToIDs(compactTos))
})
segments.SetSegment(segment.GetID(), segment)

s, ok = segments.GetCompactionTo(3)
assert.Nil(t, s)
assert.True(t, ok)
s, ok = segments.GetCompactionTo(1)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())

// should be droped.
segments.DropSegment(1)
s, ok = segments.GetCompactionTo(1)
assert.False(t, ok)
assert.Nil(t, s)
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, int64(3), s.GetID())
s, ok = segments.GetCompactionTo(3)
assert.Nil(t, s)
assert.True(t, ok)

segments.DropSegment(3)
s, ok = segments.GetCompactionTo(2)
assert.True(t, ok)
assert.Nil(t, s)
}

func TestGetSegmentSize(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
info = s.meta.GetSegment(id)
// TODO: GetCompactionTo should be removed and add into GetSegment method and protected by lock.
// Too much modification need to be applied to SegmentInfo, a refactor is needed.
child, ok := s.meta.GetCompactionTo(id)
children, ok := s.meta.GetCompactionTo(id)

// info may be not-nil, but ok is false when the segment is being dropped concurrently.
if info == nil || !ok {
Expand All @@ -439,7 +439,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
}

clonedInfo := info.Clone()
if child != nil {
for _, child := range children {
clonedChild := child.Clone()
// child segment should decompress binlog path
binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs())
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/task_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *statsTaskSuite) SetupSuite() {
},
},
},
compactionTo: map[UniqueID]UniqueID{},
compactionTo: map[UniqueID][]UniqueID{},
},

statsTaskMeta: &statsTaskMeta{
Expand Down

0 comments on commit d2c774f

Please sign in to comment.