From 6d9007c0fbe7330b1334a17a1a48e560c1a139b6 Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Wed, 13 Nov 2024 22:38:41 -0800 Subject: [PATCH] vecindex: enable background fixup processing The vector index now starts up a background goroutine that will process split, merge, and other fixups for the index. A new testing command validates the resulting index. Epic: CRDB-42943 Release note: None --- pkg/sql/vecindex/fixup_processor.go | 41 ++++++++- pkg/sql/vecindex/fixup_processor_test.go | 2 +- .../vecindex/testdata/background-fixups.ddt | 12 +++ pkg/sql/vecindex/testdata/search-features.ddt | 72 ++++++++-------- pkg/sql/vecindex/vector_index.go | 23 +++++ pkg/sql/vecindex/vector_index_test.go | 85 ++++++++++++++++++- 6 files changed, 192 insertions(+), 43 deletions(-) create mode 100644 pkg/sql/vecindex/testdata/background-fixups.ddt diff --git a/pkg/sql/vecindex/fixup_processor.go b/pkg/sql/vecindex/fixup_processor.go index 53ce1ace39a2..98c927f1c04e 100644 --- a/pkg/sql/vecindex/fixup_processor.go +++ b/pkg/sql/vecindex/fixup_processor.go @@ -8,6 +8,7 @@ package vecindex import ( "context" "math/rand" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/internal" @@ -87,6 +88,10 @@ type fixupProcessor struct { // maxFixups limit has been reached. fixupsLimitHit log.EveryN + // pendingCount tracks the number of pending fixups that still need to be + // processed. + pendingCount sync.WaitGroup + // -------------------------------------------------- // The following fields should only be accessed on a single background // goroutine (or a single foreground goroutine in deterministic tests). @@ -131,8 +136,34 @@ func (fp *fixupProcessor) AddSplit( }) } -// runAll processes all fixups in the queue. This should only be called by -// tests on one foreground goroutine. +// Start is meant to be called on a background goroutine. It runs until the +// provided context is canceled, processing fixups as they are added to the +// fixup processor. +func (fp *fixupProcessor) Start(ctx context.Context) { + for { + // Wait to run the next fixup in the queue. + ok, err := fp.run(ctx, true /* wait */) + if err != nil { + // This is a background goroutine, so just log error and continue. + log.Errorf(ctx, "fixup processor error: %v", err) + continue + } + + if !ok { + // Context was canceled, so exit. + return + } + } +} + +// Wait blocks until all pending fixups have been processed by the background +// goroutine. This is useful in testing. +func (fp *fixupProcessor) Wait() { + fp.pendingCount.Wait() +} + +// runAll processes all fixups in the queue. This should only be called by tests +// on one foreground goroutine, and only in cases where Start was not called. func (fp *fixupProcessor) runAll(ctx context.Context) error { for { ok, err := fp.run(ctx, false /* wait */) @@ -188,6 +219,9 @@ func (fp *fixupProcessor) run(ctx context.Context, wait bool) (ok bool, err erro fp.mu.Lock() defer fp.mu.Unlock() + // Decrement the number of pending fixups. + fp.pendingCount.Done() + switch next.Type { case splitFixup: key := partitionFixupKey{Type: next.Type, PartitionKey: next.PartitionKey} @@ -222,6 +256,9 @@ func (fp *fixupProcessor) addFixup(ctx context.Context, fixup fixup) { fp.mu.pendingPartitions[key] = true } + // Increment the number of pending fixups. + fp.pendingCount.Add(1) + // Note that the channel send operation should never block, since it has // maxFixups capacity. fp.fixups <- fixup diff --git a/pkg/sql/vecindex/fixup_processor_test.go b/pkg/sql/vecindex/fixup_processor_test.go index 1dfa51117190..8bf115ef0483 100644 --- a/pkg/sql/vecindex/fixup_processor_test.go +++ b/pkg/sql/vecindex/fixup_processor_test.go @@ -22,7 +22,7 @@ func TestSplitPartitionData(t *testing.T) { quantizer := quantize.NewRaBitQuantizer(2, 42) store := vecstore.NewInMemoryStore(2, 42) options := VectorIndexOptions{Seed: 42} - index, err := NewVectorIndex(ctx, store, quantizer, &options) + index, err := NewVectorIndex(ctx, store, quantizer, &options, nil /* stopper */) require.NoError(t, err) vectors := vector.MakeSetFromRawData([]float32{ diff --git a/pkg/sql/vecindex/testdata/background-fixups.ddt b/pkg/sql/vecindex/testdata/background-fixups.ddt new file mode 100644 index 000000000000..cfec79b67793 --- /dev/null +++ b/pkg/sql/vecindex/testdata/background-fixups.ddt @@ -0,0 +1,12 @@ +# Load 1000 512-dimension features with background fixups enabled. Validate the +# resulting tree. Note that using background fixups means that the index build +# is non-deterministic, so there are limited validations we can do. + +new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=500 background-fixups hide-tree +---- +Created index with 500 vectors with 512 dimensions. + +# Traverse the complete tree and ensure that all 500 vectors are present. +validate-tree +---- +Validated index with 500 vectors. diff --git a/pkg/sql/vecindex/testdata/search-features.ddt b/pkg/sql/vecindex/testdata/search-features.ddt index b29e706785ec..1a38db0b31f6 100644 --- a/pkg/sql/vecindex/testdata/search-features.ddt +++ b/pkg/sql/vecindex/testdata/search-features.ddt @@ -1,75 +1,75 @@ -# Load 1000 512-dimension features and search them. Use small partition size to +# Load 500 512-dimension features and search them. Use small partition size to # ensure a deeper tree. -new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=1000 hide-tree +new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=500 hide-tree ---- -Created index with 1000 vectors with 512 dimensions. +Created index with 500 vectors with 512 dimensions. # Start with 1 result and default beam size of 2. search max-results=1 use-feature=5000 ---- vec302: 0.6601 (centroid=0.4138) -14 leaf vectors, 33 vectors, 4 full vectors, 5 partitions +9 leaf vectors, 25 vectors, 3 full vectors, 5 partitions # Search for additional results. search max-results=6 use-feature=5000 ---- vec302: 0.6601 (centroid=0.4138) -vec329: 0.6871 (centroid=0.5033) -vec386: 0.7301 (centroid=0.5117) -vec240: 0.7723 (centroid=0.4702) vec347: 0.7745 (centroid=0.6267) vec11: 0.777 (centroid=0.5067) -14 leaf vectors, 33 vectors, 10 full vectors, 5 partitions +vec208: 0.9026 (centroid=0.5818) +vec435: 0.9124 (centroid=0.7488) +vec223: 0.9145 (centroid=0.5778) +9 leaf vectors, 25 vectors, 9 full vectors, 5 partitions # Use a larger beam size. search max-results=6 use-feature=5000 beam-size=8 ---- -vec771: 0.5624 (centroid=0.4676) vec302: 0.6601 (centroid=0.4138) -vec329: 0.6871 (centroid=0.5033) -vec386: 0.7301 (centroid=0.5117) -vec240: 0.7723 (centroid=0.4702) +vec329: 0.6871 (centroid=0.4672) +vec386: 0.7301 (centroid=0.6006) +vec309: 0.7311 (centroid=0.4039) +vec240: 0.7723 (centroid=0.5003) vec347: 0.7745 (centroid=0.6267) -50 leaf vectors, 91 vectors, 12 full vectors, 15 partitions +46 leaf vectors, 81 vectors, 22 full vectors, 15 partitions # Turn off re-ranking, which results in increased inaccuracy. search max-results=6 use-feature=5000 beam-size=8 skip-rerank ---- -vec771: 0.5499 ±0.0291 (centroid=0.4676) vec302: 0.6246 ±0.0274 (centroid=0.4138) -vec329: 0.6609 ±0.0333 (centroid=0.5033) -vec386: 0.7245 ±0.0338 (centroid=0.5117) +vec329: 0.6501 ±0.031 (centroid=0.4672) +vec386: 0.7132 ±0.0398 (centroid=0.6006) vec347: 0.7279 ±0.0415 (centroid=0.6267) +vec309: 0.73 ±0.0273 (centroid=0.4039) vec11: 0.7509 ±0.0336 (centroid=0.5067) -50 leaf vectors, 91 vectors, 0 full vectors, 15 partitions +46 leaf vectors, 81 vectors, 0 full vectors, 15 partitions # Return top 25 results with large beam size. search max-results=25 use-feature=5000 beam-size=64 ---- -vec771: 0.5624 (centroid=0.4676) -vec356: 0.5976 (centroid=0.5117) -vec640: 0.6525 (centroid=0.6139) +vec356: 0.5976 (centroid=0.5821) vec302: 0.6601 (centroid=0.4138) -vec329: 0.6871 (centroid=0.5033) +vec329: 0.6871 (centroid=0.4672) vec95: 0.7008 (centroid=0.5542) -vec249: 0.7268 (centroid=0.3715) -vec386: 0.7301 (centroid=0.5117) -vec309: 0.7311 (centroid=0.4912) -vec633: 0.7513 (centroid=0.4095) +vec249: 0.7268 (centroid=0.4042) +vec386: 0.7301 (centroid=0.6006) +vec309: 0.7311 (centroid=0.4039) vec117: 0.7576 (centroid=0.4538) -vec556: 0.7595 (centroid=0.5531) vec25: 0.761 (centroid=0.4576) -vec872: 0.7707 (centroid=0.6427) -vec859: 0.7708 (centroid=0.6614) -vec240: 0.7723 (centroid=0.4702) +vec240: 0.7723 (centroid=0.5003) vec347: 0.7745 (centroid=0.6267) vec11: 0.777 (centroid=0.5067) -vec340: 0.7858 (centroid=0.4752) -vec239: 0.7878 (centroid=0.4584) -vec704: 0.7916 (centroid=0.7117) -vec423: 0.7956 (centroid=0.4608) +vec340: 0.7858 (centroid=0.5942) +vec239: 0.7878 (centroid=0.4654) +vec423: 0.7956 (centroid=0.6041) vec220: 0.7957 (centroid=0.4226) -vec387: 0.8038 (centroid=0.4652) -vec637: 0.8039 (centroid=0.5211) -356 leaf vectors, 567 vectors, 97 full vectors, 103 partitions +vec387: 0.8038 (centroid=0.4954) +vec410: 0.8062 (centroid=0.4643) +vec52: 0.8068 (centroid=0.5793) +vec379: 0.8082 (centroid=0.3691) +vec457: 0.8084 (centroid=0.5842) +vec184: 0.8139 (centroid=0.4777) +vec246: 0.8141 (centroid=0.5273) +vec493: 0.8184 (centroid=0.6874) +vec129: 0.8211 (centroid=0.432) +338 leaf vectors, 453 vectors, 78 full vectors, 85 partitions diff --git a/pkg/sql/vecindex/vector_index.go b/pkg/sql/vecindex/vector_index.go index 71364cc21a1b..249c19b8c6c7 100644 --- a/pkg/sql/vecindex/vector_index.go +++ b/pkg/sql/vecindex/vector_index.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/vecindex/quantize" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore" "github.com/cockroachdb/cockroach/pkg/util/num32" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/vector" "github.com/cockroachdb/errors" ) @@ -127,15 +128,21 @@ type VectorIndex struct { // fixups runs index maintenance operations like split and merge on a // background goroutine. fixups fixupProcessor + // cancel stops the background fixup processing goroutine. + cancel func() } // NewVectorIndex constructs a new vector index instance. Typically, only one // VectorIndex instance should be created for each index in the process. +// NOTE: If "stopper" is not nil, then the vector index will start a background +// goroutine to process index fixups. When the index is no longer needed, the +// caller must call Close to shut down the background goroutine. func NewVectorIndex( ctx context.Context, store vecstore.Store, quantizer quantize.Quantizer, options *VectorIndexOptions, + stopper *stop.Stopper, ) (*VectorIndex, error) { vi := &VectorIndex{ options: *options, @@ -158,9 +165,25 @@ func NewVectorIndex( vi.fixups.Init(vi, options.Seed) + if stopper != nil { + // Start the background goroutine. + ctx, vi.cancel = stopper.WithCancelOnQuiesce(ctx) + err := stopper.RunAsyncTask(ctx, "vecindex-fixups", vi.fixups.Start) + if err != nil { + return nil, errors.Wrap(err, "starting fixup processor") + } + } + return vi, nil } +// Close cancels the background goroutine, if it's running. +func (vi *VectorIndex) Close() { + if vi.cancel != nil { + vi.cancel() + } +} + // CreateRoot creates an empty root partition in the store. This should only be // called once when the index is first created. func (vi *VectorIndex) CreateRoot(ctx context.Context, txn vecstore.Txn) error { diff --git a/pkg/sql/vecindex/vector_index_test.go b/pkg/sql/vecindex/vector_index_test.go index 85c4de85b9b5..c311a0fd270d 100644 --- a/pkg/sql/vecindex/vector_index_test.go +++ b/pkg/sql/vecindex/vector_index_test.go @@ -17,14 +17,22 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/vecindex/quantize" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/testutils" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/vector" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" ) func TestDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := internal.WithWorkspace(context.Background(), &internal.Workspace{}) - state := testState{T: t, Ctx: ctx} + state := testState{T: t, Ctx: ctx, Stopper: stop.NewStopper()} + defer state.Stopper.Stop(ctx) + datadriven.Walk(t, "testdata", func(t *testing.T, path string) { if !strings.HasSuffix(path, ".ddt") { // Skip files that are not data-driven tests. @@ -46,6 +54,9 @@ func TestDataDriven(t *testing.T) { case "delete": return state.Delete(d) + + case "validate-tree": + return state.ValidateTree(d) } t.Fatalf("unknown cmd: %s", d.Cmd) @@ -57,6 +68,7 @@ func TestDataDriven(t *testing.T) { type testState struct { T *testing.T Ctx context.Context + Stopper *stop.Stopper Quantizer quantize.Quantizer InMemStore *vecstore.InMemoryStore Index *VectorIndex @@ -66,6 +78,7 @@ type testState struct { func (s *testState) NewIndex(d *datadriven.TestData) string { var err error + var stopper *stop.Stopper dims := 2 s.Options = VectorIndexOptions{Seed: 42} for _, arg := range d.CmdArgs { @@ -94,12 +107,16 @@ func (s *testState) NewIndex(d *datadriven.TestData) string { require.Len(s.T, arg.Vals, 1) s.Options.BaseBeamSize, err = strconv.Atoi(arg.Vals[0]) require.NoError(s.T, err) + + case "background-fixups": + require.Len(s.T, arg.Vals, 0) + stopper = s.Stopper } } s.Quantizer = quantize.NewRaBitQuantizer(dims, 42) s.InMemStore = vecstore.NewInMemoryStore(dims, 42) - s.Index, err = NewVectorIndex(s.Ctx, s.InMemStore, s.Quantizer, &s.Options) + s.Index, err = NewVectorIndex(s.Ctx, s.InMemStore, s.Quantizer, &s.Options, stopper) require.NoError(s.T, err) // Insert empty root partition. @@ -237,12 +254,12 @@ func (s *testState) Insert(d *datadriven.TestData) string { if (i+1)%s.Options.MaxPartitionSize == 0 { // Periodically, run synchronous fixups so that test results are // deterministic. - require.NoError(s.T, s.Index.fixups.runAll(s.Ctx)) + require.NoError(s.T, s.runAllFixups(true /* skipBackground */)) } } // Handle any remaining fixups. - require.NoError(s.T, s.Index.fixups.runAll(s.Ctx)) + require.NoError(s.T, s.runAllFixups(false /* skipBackground */)) if hideTree { return fmt.Sprintf("Created index with %d vectors with %d dimensions.\n", @@ -289,6 +306,66 @@ func (s *testState) Delete(d *datadriven.TestData) string { return tree } +func (s *testState) ValidateTree(d *datadriven.TestData) string { + txn := beginTransaction(s.Ctx, s.T, s.InMemStore) + defer commitTransaction(s.Ctx, s.T, s.InMemStore, txn) + + vectorCount := 0 + partitionKeys := []vecstore.PartitionKey{vecstore.RootKey} + for { + // Get all child keys for next level. + var childKeys []vecstore.ChildKey + for _, key := range partitionKeys { + partition, err := s.InMemStore.GetPartition(s.Ctx, txn, key) + require.NoError(s.T, err) + childKeys = append(childKeys, partition.ChildKeys()...) + } + + if len(childKeys) == 0 { + break + } + + // Verify full vectors exist for the level. + refs := make([]vecstore.VectorWithKey, len(childKeys)) + for i := range childKeys { + refs[i].Key = childKeys[i] + } + err := s.InMemStore.GetFullVectors(s.Ctx, txn, refs) + require.NoError(s.T, err) + for i := range refs { + require.NotNil(s.T, refs[i].Vector) + } + + // If this is not the leaf level, then process the next level. + if childKeys[0].PrimaryKey == nil { + partitionKeys = make([]vecstore.PartitionKey, len(childKeys)) + for i := range childKeys { + partitionKeys[i] = childKeys[i].PartitionKey + } + } else { + // This is the leaf level, so count vectors and end. + vectorCount += len(childKeys) + break + } + } + + return fmt.Sprintf("Validated index with %d vectors.\n", vectorCount) +} + +// runAllFixups forces all pending fixups to be processed. +func (s *testState) runAllFixups(skipBackground bool) error { + if s.Index.cancel != nil { + // Background fixup goroutine is running, so wait until it has processed + // all fixups. + if !skipBackground { + s.Index.fixups.Wait() + } + return nil + } + // Synchronously run fixups. + return s.Index.fixups.runAll(s.Ctx) +} + // parseVector parses a vector string in this form: (1.5, 6, -4). func (s *testState) parseVector(str string) vector.T { // Remove parentheses and split by commas.