diff --git a/pkg/sql/vecindex/BUILD.bazel b/pkg/sql/vecindex/BUILD.bazel index 56d6486a96b0..b11f0a052005 100644 --- a/pkg/sql/vecindex/BUILD.bazel +++ b/pkg/sql/vecindex/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/sql/vecindex/vecstore", "//pkg/util/log", "//pkg/util/num32", + "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/vector", "@com_github_cockroachdb_errors//:errors", @@ -43,7 +44,10 @@ go_test( "//pkg/sql/vecindex/quantize", "//pkg/sql/vecindex/testutils", "//pkg/sql/vecindex/vecstore", + "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/num32", + "//pkg/util/stop", "//pkg/util/vector", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/vecindex/fixup_processor.go b/pkg/sql/vecindex/fixup_processor.go index 53ce1ace39a2..98c927f1c04e 100644 --- a/pkg/sql/vecindex/fixup_processor.go +++ b/pkg/sql/vecindex/fixup_processor.go @@ -8,6 +8,7 @@ package vecindex import ( "context" "math/rand" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/internal" @@ -87,6 +88,10 @@ type fixupProcessor struct { // maxFixups limit has been reached. fixupsLimitHit log.EveryN + // pendingCount tracks the number of pending fixups that still need to be + // processed. + pendingCount sync.WaitGroup + // -------------------------------------------------- // The following fields should only be accessed on a single background // goroutine (or a single foreground goroutine in deterministic tests). @@ -131,8 +136,34 @@ func (fp *fixupProcessor) AddSplit( }) } -// runAll processes all fixups in the queue. This should only be called by -// tests on one foreground goroutine. +// Start is meant to be called on a background goroutine. It runs until the +// provided context is canceled, processing fixups as they are added to the +// fixup processor. +func (fp *fixupProcessor) Start(ctx context.Context) { + for { + // Wait to run the next fixup in the queue. + ok, err := fp.run(ctx, true /* wait */) + if err != nil { + // This is a background goroutine, so just log error and continue. + log.Errorf(ctx, "fixup processor error: %v", err) + continue + } + + if !ok { + // Context was canceled, so exit. + return + } + } +} + +// Wait blocks until all pending fixups have been processed by the background +// goroutine. This is useful in testing. +func (fp *fixupProcessor) Wait() { + fp.pendingCount.Wait() +} + +// runAll processes all fixups in the queue. This should only be called by tests +// on one foreground goroutine, and only in cases where Start was not called. func (fp *fixupProcessor) runAll(ctx context.Context) error { for { ok, err := fp.run(ctx, false /* wait */) @@ -188,6 +219,9 @@ func (fp *fixupProcessor) run(ctx context.Context, wait bool) (ok bool, err erro fp.mu.Lock() defer fp.mu.Unlock() + // Decrement the number of pending fixups. + fp.pendingCount.Done() + switch next.Type { case splitFixup: key := partitionFixupKey{Type: next.Type, PartitionKey: next.PartitionKey} @@ -222,6 +256,9 @@ func (fp *fixupProcessor) addFixup(ctx context.Context, fixup fixup) { fp.mu.pendingPartitions[key] = true } + // Increment the number of pending fixups. + fp.pendingCount.Add(1) + // Note that the channel send operation should never block, since it has // maxFixups capacity. fp.fixups <- fixup diff --git a/pkg/sql/vecindex/fixup_processor_test.go b/pkg/sql/vecindex/fixup_processor_test.go index 1dfa51117190..8bf115ef0483 100644 --- a/pkg/sql/vecindex/fixup_processor_test.go +++ b/pkg/sql/vecindex/fixup_processor_test.go @@ -22,7 +22,7 @@ func TestSplitPartitionData(t *testing.T) { quantizer := quantize.NewRaBitQuantizer(2, 42) store := vecstore.NewInMemoryStore(2, 42) options := VectorIndexOptions{Seed: 42} - index, err := NewVectorIndex(ctx, store, quantizer, &options) + index, err := NewVectorIndex(ctx, store, quantizer, &options, nil /* stopper */) require.NoError(t, err) vectors := vector.MakeSetFromRawData([]float32{ diff --git a/pkg/sql/vecindex/testdata/background-fixups.ddt b/pkg/sql/vecindex/testdata/background-fixups.ddt new file mode 100644 index 000000000000..cfec79b67793 --- /dev/null +++ b/pkg/sql/vecindex/testdata/background-fixups.ddt @@ -0,0 +1,12 @@ +# Load 1000 512-dimension features with background fixups enabled. Validate the +# resulting tree. Note that using background fixups means that the index build +# is non-deterministic, so there are limited validations we can do. + +new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=500 background-fixups hide-tree +---- +Created index with 500 vectors with 512 dimensions. + +# Traverse the complete tree and ensure that all 500 vectors are present. +validate-tree +---- +Validated index with 500 vectors. diff --git a/pkg/sql/vecindex/testdata/search-features.ddt b/pkg/sql/vecindex/testdata/search-features.ddt index 2b2c2008954a..8c6f2c557d16 100644 --- a/pkg/sql/vecindex/testdata/search-features.ddt +++ b/pkg/sql/vecindex/testdata/search-features.ddt @@ -1,96 +1,96 @@ -# Load 1000 512-dimension features and search them. Use small partition size to +# Load 500 512-dimension features and search them. Use small partition size to # ensure a deeper tree. -new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=1000 hide-tree +new-index dims=512 min-partition-size=4 max-partition-size=16 quality-samples=4 beam-size=2 load-features=1000 hide-tree ---- Created index with 1000 vectors with 512 dimensions. # Start with 1 result and default beam size of 2. search max-results=1 use-feature=5000 ---- -vec302: 0.6601 (centroid=0.4138) -14 leaf vectors, 33 vectors, 4 full vectors, 5 partitions +vec356: 0.5976 (centroid=0.5046) +18 leaf vectors, 34 vectors, 3 full vectors, 4 partitions # Search for additional results. search max-results=6 use-feature=5000 ---- -vec302: 0.6601 (centroid=0.4138) -vec329: 0.6871 (centroid=0.5033) -vec386: 0.7301 (centroid=0.5117) -vec240: 0.7723 (centroid=0.4702) -vec347: 0.7745 (centroid=0.6267) -vec11: 0.777 (centroid=0.5067) -14 leaf vectors, 33 vectors, 10 full vectors, 5 partitions +vec356: 0.5976 (centroid=0.5046) +vec95: 0.7008 (centroid=0.5551) +vec11: 0.777 (centroid=0.6306) +vec848: 0.7958 (centroid=0.5294) +vec246: 0.8141 (centroid=0.5237) +vec650: 0.8432 (centroid=0.6338) +18 leaf vectors, 34 vectors, 10 full vectors, 4 partitions # Use a larger beam size. search max-results=6 use-feature=5000 beam-size=8 ---- -vec771: 0.5624 (centroid=0.4676) -vec302: 0.6601 (centroid=0.4138) -vec329: 0.6871 (centroid=0.5033) -vec386: 0.7301 (centroid=0.5117) -vec240: 0.7723 (centroid=0.4702) -vec347: 0.7745 (centroid=0.6267) -50 leaf vectors, 91 vectors, 12 full vectors, 15 partitions +vec771: 0.5624 (centroid=0.631) +vec356: 0.5976 (centroid=0.5046) +vec640: 0.6525 (centroid=0.6245) +vec329: 0.6871 (centroid=0.5083) +vec95: 0.7008 (centroid=0.5551) +vec386: 0.7301 (centroid=0.5489) +70 leaf vectors, 115 vectors, 17 full vectors, 13 partitions # Turn off re-ranking, which results in increased inaccuracy. search max-results=6 use-feature=5000 beam-size=8 skip-rerank ---- -vec771: 0.5499 ±0.0291 (centroid=0.4676) -vec302: 0.6246 ±0.0274 (centroid=0.4138) -vec329: 0.6609 ±0.0333 (centroid=0.5033) -vec386: 0.7245 ±0.0338 (centroid=0.5117) -vec347: 0.7279 ±0.0415 (centroid=0.6267) -vec11: 0.7509 ±0.0336 (centroid=0.5067) -50 leaf vectors, 91 vectors, 0 full vectors, 15 partitions +vec771: 0.5937 ±0.0437 (centroid=0.631) +vec356: 0.6205 ±0.0328 (centroid=0.5046) +vec640: 0.6564 ±0.0433 (centroid=0.6245) +vec329: 0.6787 ±0.0311 (centroid=0.5083) +vec95: 0.7056 ±0.0388 (centroid=0.5551) +vec386: 0.7212 ±0.0336 (centroid=0.5489) +70 leaf vectors, 115 vectors, 0 full vectors, 13 partitions # Return top 25 results with large beam size. search max-results=25 use-feature=5000 beam-size=64 ---- -vec771: 0.5624 (centroid=0.4676) -vec356: 0.5976 (centroid=0.5117) -vec640: 0.6525 (centroid=0.6139) -vec302: 0.6601 (centroid=0.4138) -vec329: 0.6871 (centroid=0.5033) -vec95: 0.7008 (centroid=0.5542) -vec249: 0.7268 (centroid=0.3715) -vec386: 0.7301 (centroid=0.5117) -vec309: 0.7311 (centroid=0.4912) -vec633: 0.7513 (centroid=0.4095) -vec117: 0.7576 (centroid=0.4538) -vec556: 0.7595 (centroid=0.5531) -vec25: 0.761 (centroid=0.4576) -vec872: 0.7707 (centroid=0.6427) -vec859: 0.7708 (centroid=0.6614) -vec240: 0.7723 (centroid=0.4702) -vec347: 0.7745 (centroid=0.6267) -vec11: 0.777 (centroid=0.5067) -vec340: 0.7858 (centroid=0.4752) -vec239: 0.7878 (centroid=0.4584) -vec704: 0.7916 (centroid=0.7117) -vec423: 0.7956 (centroid=0.4608) -vec220: 0.7957 (centroid=0.4226) -vec387: 0.8038 (centroid=0.4652) -vec637: 0.8039 (centroid=0.5211) -356 leaf vectors, 567 vectors, 97 full vectors, 103 partitions +vec771: 0.5624 (centroid=0.631) +vec356: 0.5976 (centroid=0.5046) +vec640: 0.6525 (centroid=0.6245) +vec302: 0.6601 (centroid=0.5159) +vec329: 0.6871 (centroid=0.5083) +vec95: 0.7008 (centroid=0.5551) +vec249: 0.7268 (centroid=0.4459) +vec386: 0.7301 (centroid=0.5489) +vec309: 0.7311 (centroid=0.5569) +vec633: 0.7513 (centroid=0.4747) +vec117: 0.7576 (centroid=0.5211) +vec556: 0.7595 (centroid=0.459) +vec25: 0.761 (centroid=0.4394) +vec776: 0.7633 (centroid=0.4892) +vec872: 0.7707 (centroid=0.5141) +vec859: 0.7708 (centroid=0.5757) +vec240: 0.7723 (centroid=0.5266) +vec347: 0.7745 (centroid=0.5297) +vec11: 0.777 (centroid=0.6306) +vec340: 0.7858 (centroid=0.5312) +vec239: 0.7878 (centroid=0.5127) +vec704: 0.7916 (centroid=0.5169) +vec423: 0.7956 (centroid=0.4941) +vec220: 0.7957 (centroid=0.4916) +vec848: 0.7958 (centroid=0.5294) +683 leaf vectors, 787 vectors, 100 full vectors, 74 partitions # Test recall at different beam sizes. +recall topk=10 beam-size=4 samples=50 +---- +50.00% recall@10 +44.26 leaf vectors, 75.42 vectors, 20.38 full vectors, 7.00 partitions + recall topk=10 beam-size=8 samples=50 ---- -53.60% recall@10 -46.62 leaf vectors, 86.08 vectors, 20.18 full vectors, 15.00 partitions +70.40% recall@10 +85.90 leaf vectors, 136.26 vectors, 24.44 full vectors, 13.00 partitions recall topk=10 beam-size=16 samples=50 ---- -76.40% recall@10 -94.02 leaf vectors, 168.58 vectors, 24.84 full vectors, 29.00 partitions +85.20% recall@10 +169.94 leaf vectors, 263.62 vectors, 27.90 full vectors, 25.00 partitions recall topk=10 beam-size=32 samples=50 ---- -91.80% recall@10 -188.30 leaf vectors, 317.30 vectors, 28.52 full vectors, 55.00 partitions - -recall topk=10 beam-size=64 samples=50 ----- -97.40% recall@10 -371.40 leaf vectors, 585.00 vectors, 31.60 full vectors, 103.00 partitions +97.00% recall@10 +336.46 leaf vectors, 440.46 vectors, 31.52 full vectors, 42.00 partitions diff --git a/pkg/sql/vecindex/vector_index.go b/pkg/sql/vecindex/vector_index.go index cc560d909d33..7f1dbb7c1eb9 100644 --- a/pkg/sql/vecindex/vector_index.go +++ b/pkg/sql/vecindex/vector_index.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/vecindex/quantize" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore" "github.com/cockroachdb/cockroach/pkg/util/num32" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/vector" "github.com/cockroachdb/errors" ) @@ -127,15 +128,21 @@ type VectorIndex struct { // fixups runs index maintenance operations like split and merge on a // background goroutine. fixups fixupProcessor + // cancel stops the background fixup processing goroutine. + cancel func() } // NewVectorIndex constructs a new vector index instance. Typically, only one // VectorIndex instance should be created for each index in the process. +// NOTE: If "stopper" is not nil, then the vector index will start a background +// goroutine to process index fixups. When the index is no longer needed, the +// caller must call Close to shut down the background goroutine. func NewVectorIndex( ctx context.Context, store vecstore.Store, quantizer quantize.Quantizer, options *VectorIndexOptions, + stopper *stop.Stopper, ) (*VectorIndex, error) { vi := &VectorIndex{ options: *options, @@ -158,9 +165,25 @@ func NewVectorIndex( vi.fixups.Init(vi, options.Seed) + if stopper != nil { + // Start the background goroutine. + ctx, vi.cancel = stopper.WithCancelOnQuiesce(ctx) + err := stopper.RunAsyncTask(ctx, "vecindex-fixups", vi.fixups.Start) + if err != nil { + return nil, errors.Wrap(err, "starting fixup processor") + } + } + return vi, nil } +// Close cancels the background goroutine, if it's running. +func (vi *VectorIndex) Close() { + if vi.cancel != nil { + vi.cancel() + } +} + // CreateRoot creates an empty root partition in the store. This should only be // called once when the index is first created. func (vi *VectorIndex) CreateRoot(ctx context.Context, txn vecstore.Txn) error { diff --git a/pkg/sql/vecindex/vector_index_test.go b/pkg/sql/vecindex/vector_index_test.go index 9ecd499b9671..21e6b32149ce 100644 --- a/pkg/sql/vecindex/vector_index_test.go +++ b/pkg/sql/vecindex/vector_index_test.go @@ -19,7 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/vecindex/quantize" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/testutils" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/num32" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/vector" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" @@ -27,8 +30,13 @@ import ( ) func TestDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := internal.WithWorkspace(context.Background(), &internal.Workspace{}) - state := testState{T: t, Ctx: ctx} + state := testState{T: t, Ctx: ctx, Stopper: stop.NewStopper()} + defer state.Stopper.Stop(ctx) + datadriven.Walk(t, "testdata", func(t *testing.T, path string) { if !strings.HasSuffix(path, ".ddt") { // Skip files that are not data-driven tests. @@ -53,6 +61,9 @@ func TestDataDriven(t *testing.T) { case "recall": return state.Recall(d) + + case "validate-tree": + return state.ValidateTree(d) } t.Fatalf("unknown cmd: %s", d.Cmd) @@ -64,6 +75,7 @@ func TestDataDriven(t *testing.T) { type testState struct { T *testing.T Ctx context.Context + Stopper *stop.Stopper Quantizer quantize.Quantizer InMemStore *vecstore.InMemoryStore Index *VectorIndex @@ -73,6 +85,7 @@ type testState struct { func (s *testState) NewIndex(d *datadriven.TestData) string { var err error + var stopper *stop.Stopper dims := 2 s.Options = VectorIndexOptions{Seed: 42} for _, arg := range d.CmdArgs { @@ -101,12 +114,16 @@ func (s *testState) NewIndex(d *datadriven.TestData) string { require.Len(s.T, arg.Vals, 1) s.Options.BaseBeamSize, err = strconv.Atoi(arg.Vals[0]) require.NoError(s.T, err) + + case "background-fixups": + require.Len(s.T, arg.Vals, 0) + stopper = s.Stopper } } s.Quantizer = quantize.NewRaBitQuantizer(dims, 42) s.InMemStore = vecstore.NewInMemoryStore(dims, 42) - s.Index, err = NewVectorIndex(s.Ctx, s.InMemStore, s.Quantizer, &s.Options) + s.Index, err = NewVectorIndex(s.Ctx, s.InMemStore, s.Quantizer, &s.Options, stopper) require.NoError(s.T, err) // Insert empty root partition. @@ -244,12 +261,12 @@ func (s *testState) Insert(d *datadriven.TestData) string { if (i+1)%s.Options.MaxPartitionSize == 0 { // Periodically, run synchronous fixups so that test results are // deterministic. - require.NoError(s.T, s.Index.fixups.runAll(s.Ctx)) + require.NoError(s.T, s.runAllFixups(true /* skipBackground */)) } } // Handle any remaining fixups. - require.NoError(s.T, s.Index.fixups.runAll(s.Ctx)) + require.NoError(s.T, s.runAllFixups(false /* skipBackground */)) if hideTree { return fmt.Sprintf("Created index with %d vectors with %d dimensions.\n", @@ -404,6 +421,66 @@ func (s *testState) Recall(d *datadriven.TestData) string { return buf.String() } +func (s *testState) ValidateTree(d *datadriven.TestData) string { + txn := beginTransaction(s.Ctx, s.T, s.InMemStore) + defer commitTransaction(s.Ctx, s.T, s.InMemStore, txn) + + vectorCount := 0 + partitionKeys := []vecstore.PartitionKey{vecstore.RootKey} + for { + // Get all child keys for next level. + var childKeys []vecstore.ChildKey + for _, key := range partitionKeys { + partition, err := s.InMemStore.GetPartition(s.Ctx, txn, key) + require.NoError(s.T, err) + childKeys = append(childKeys, partition.ChildKeys()...) + } + + if len(childKeys) == 0 { + break + } + + // Verify full vectors exist for the level. + refs := make([]vecstore.VectorWithKey, len(childKeys)) + for i := range childKeys { + refs[i].Key = childKeys[i] + } + err := s.InMemStore.GetFullVectors(s.Ctx, txn, refs) + require.NoError(s.T, err) + for i := range refs { + require.NotNil(s.T, refs[i].Vector) + } + + // If this is not the leaf level, then process the next level. + if childKeys[0].PrimaryKey == nil { + partitionKeys = make([]vecstore.PartitionKey, len(childKeys)) + for i := range childKeys { + partitionKeys[i] = childKeys[i].PartitionKey + } + } else { + // This is the leaf level, so count vectors and end. + vectorCount += len(childKeys) + break + } + } + + return fmt.Sprintf("Validated index with %d vectors.\n", vectorCount) +} + +// runAllFixups forces all pending fixups to be processed. +func (s *testState) runAllFixups(skipBackground bool) error { + if s.Index.cancel != nil { + // Background fixup goroutine is running, so wait until it has processed + // all fixups. + if !skipBackground { + s.Index.fixups.Wait() + } + return nil + } + // Synchronously run fixups. + return s.Index.fixups.runAll(s.Ctx) +} + // parseVector parses a vector string in this form: (1.5, 6, -4). func (s *testState) parseVector(str string) vector.T { // Remove parentheses and split by commas.