Skip to content

Commit

Permalink
Merge #135235
Browse files Browse the repository at this point in the history
135235: vecindex: enable background fixup processing r=drewkimball a=andy-kimball

The vector index now starts up a background goroutine that will process split, merge, and other fixups for the index. A new testing command validates the resulting index.

Epic: CRDB-42943

Release note: None

Co-authored-by: Andrew Kimball <[email protected]>
  • Loading branch information
craig[bot] and andy-kimball committed Nov 15, 2024
2 parents 9ecb09a + cecc413 commit 6da166b
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 69 deletions.
4 changes: 4 additions & 0 deletions pkg/sql/vecindex/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/sql/vecindex/vecstore",
"//pkg/util/log",
"//pkg/util/num32",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/vector",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -43,7 +44,10 @@ go_test(
"//pkg/sql/vecindex/quantize",
"//pkg/sql/vecindex/testutils",
"//pkg/sql/vecindex/vecstore",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/num32",
"//pkg/util/stop",
"//pkg/util/vector",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
41 changes: 39 additions & 2 deletions pkg/sql/vecindex/fixup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package vecindex
import (
"context"
"math/rand"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/vecindex/internal"
Expand Down Expand Up @@ -87,6 +88,10 @@ type fixupProcessor struct {
// maxFixups limit has been reached.
fixupsLimitHit log.EveryN

// pendingCount tracks the number of pending fixups that still need to be
// processed.
pendingCount sync.WaitGroup

// --------------------------------------------------
// The following fields should only be accessed on a single background
// goroutine (or a single foreground goroutine in deterministic tests).
Expand Down Expand Up @@ -131,8 +136,34 @@ func (fp *fixupProcessor) AddSplit(
})
}

// runAll processes all fixups in the queue. This should only be called by
// tests on one foreground goroutine.
// Start is meant to be called on a background goroutine. It runs until the
// provided context is canceled, processing fixups as they are added to the
// fixup processor.
func (fp *fixupProcessor) Start(ctx context.Context) {
for {
// Wait to run the next fixup in the queue.
ok, err := fp.run(ctx, true /* wait */)
if err != nil {
// This is a background goroutine, so just log error and continue.
log.Errorf(ctx, "fixup processor error: %v", err)
continue
}

if !ok {
// Context was canceled, so exit.
return
}
}
}

// Wait blocks until all pending fixups have been processed by the background
// goroutine. This is useful in testing.
func (fp *fixupProcessor) Wait() {
fp.pendingCount.Wait()
}

// runAll processes all fixups in the queue. This should only be called by tests
// on one foreground goroutine, and only in cases where Start was not called.
func (fp *fixupProcessor) runAll(ctx context.Context) error {
for {
ok, err := fp.run(ctx, false /* wait */)
Expand Down Expand Up @@ -188,6 +219,9 @@ func (fp *fixupProcessor) run(ctx context.Context, wait bool) (ok bool, err erro
fp.mu.Lock()
defer fp.mu.Unlock()

// Decrement the number of pending fixups.
fp.pendingCount.Done()

switch next.Type {
case splitFixup:
key := partitionFixupKey{Type: next.Type, PartitionKey: next.PartitionKey}
Expand Down Expand Up @@ -222,6 +256,9 @@ func (fp *fixupProcessor) addFixup(ctx context.Context, fixup fixup) {
fp.mu.pendingPartitions[key] = true
}

// Increment the number of pending fixups.
fp.pendingCount.Add(1)

// Note that the channel send operation should never block, since it has
// maxFixups capacity.
fp.fixups <- fixup
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/vecindex/fixup_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestSplitPartitionData(t *testing.T) {
quantizer := quantize.NewRaBitQuantizer(2, 42)
store := vecstore.NewInMemoryStore(2, 42)
options := VectorIndexOptions{Seed: 42}
index, err := NewVectorIndex(ctx, store, quantizer, &options)
index, err := NewVectorIndex(ctx, store, quantizer, &options, nil /* stopper */)
require.NoError(t, err)

vectors := vector.MakeSetFromRawData([]float32{
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/vecindex/testdata/background-fixups.ddt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Load 1000 512-dimension features with background fixups enabled. Validate the
# resulting tree. Note that using background fixups means that the index build
# is non-deterministic, so there are limited validations we can do.

new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=500 background-fixups hide-tree
----
Created index with 500 vectors with 512 dimensions.

# Traverse the complete tree and ensure that all 500 vectors are present.
validate-tree
----
Validated index with 500 vectors.
124 changes: 62 additions & 62 deletions pkg/sql/vecindex/testdata/search-features.ddt
Original file line number Diff line number Diff line change
@@ -1,96 +1,96 @@
# Load 1000 512-dimension features and search them. Use small partition size to
# Load 500 512-dimension features and search them. Use small partition size to
# ensure a deeper tree.

new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=1000 hide-tree
new-index dims=512 min-partition-size=4 max-partition-size=16 quality-samples=4 beam-size=2 load-features=1000 hide-tree
----
Created index with 1000 vectors with 512 dimensions.

# Start with 1 result and default beam size of 2.
search max-results=1 use-feature=5000
----
vec302: 0.6601 (centroid=0.4138)
14 leaf vectors, 33 vectors, 4 full vectors, 5 partitions
vec356: 0.5976 (centroid=0.5046)
18 leaf vectors, 34 vectors, 3 full vectors, 4 partitions

# Search for additional results.
search max-results=6 use-feature=5000
----
vec302: 0.6601 (centroid=0.4138)
vec329: 0.6871 (centroid=0.5033)
vec386: 0.7301 (centroid=0.5117)
vec240: 0.7723 (centroid=0.4702)
vec347: 0.7745 (centroid=0.6267)
vec11: 0.777 (centroid=0.5067)
14 leaf vectors, 33 vectors, 10 full vectors, 5 partitions
vec356: 0.5976 (centroid=0.5046)
vec95: 0.7008 (centroid=0.5551)
vec11: 0.777 (centroid=0.6306)
vec848: 0.7958 (centroid=0.5294)
vec246: 0.8141 (centroid=0.5237)
vec650: 0.8432 (centroid=0.6338)
18 leaf vectors, 34 vectors, 10 full vectors, 4 partitions

# Use a larger beam size.
search max-results=6 use-feature=5000 beam-size=8
----
vec771: 0.5624 (centroid=0.4676)
vec302: 0.6601 (centroid=0.4138)
vec329: 0.6871 (centroid=0.5033)
vec386: 0.7301 (centroid=0.5117)
vec240: 0.7723 (centroid=0.4702)
vec347: 0.7745 (centroid=0.6267)
50 leaf vectors, 91 vectors, 12 full vectors, 15 partitions
vec771: 0.5624 (centroid=0.631)
vec356: 0.5976 (centroid=0.5046)
vec640: 0.6525 (centroid=0.6245)
vec329: 0.6871 (centroid=0.5083)
vec95: 0.7008 (centroid=0.5551)
vec386: 0.7301 (centroid=0.5489)
70 leaf vectors, 115 vectors, 17 full vectors, 13 partitions

# Turn off re-ranking, which results in increased inaccuracy.
search max-results=6 use-feature=5000 beam-size=8 skip-rerank
----
vec771: 0.5499 ±0.0291 (centroid=0.4676)
vec302: 0.6246 ±0.0274 (centroid=0.4138)
vec329: 0.6609 ±0.0333 (centroid=0.5033)
vec386: 0.7245 ±0.0338 (centroid=0.5117)
vec347: 0.7279 ±0.0415 (centroid=0.6267)
vec11: 0.7509 ±0.0336 (centroid=0.5067)
50 leaf vectors, 91 vectors, 0 full vectors, 15 partitions
vec771: 0.5937 ±0.0437 (centroid=0.631)
vec356: 0.6205 ±0.0328 (centroid=0.5046)
vec640: 0.6564 ±0.0433 (centroid=0.6245)
vec329: 0.6787 ±0.0311 (centroid=0.5083)
vec95: 0.7056 ±0.0388 (centroid=0.5551)
vec386: 0.7212 ±0.0336 (centroid=0.5489)
70 leaf vectors, 115 vectors, 0 full vectors, 13 partitions

# Return top 25 results with large beam size.
search max-results=25 use-feature=5000 beam-size=64
----
vec771: 0.5624 (centroid=0.4676)
vec356: 0.5976 (centroid=0.5117)
vec640: 0.6525 (centroid=0.6139)
vec302: 0.6601 (centroid=0.4138)
vec329: 0.6871 (centroid=0.5033)
vec95: 0.7008 (centroid=0.5542)
vec249: 0.7268 (centroid=0.3715)
vec386: 0.7301 (centroid=0.5117)
vec309: 0.7311 (centroid=0.4912)
vec633: 0.7513 (centroid=0.4095)
vec117: 0.7576 (centroid=0.4538)
vec556: 0.7595 (centroid=0.5531)
vec25: 0.761 (centroid=0.4576)
vec872: 0.7707 (centroid=0.6427)
vec859: 0.7708 (centroid=0.6614)
vec240: 0.7723 (centroid=0.4702)
vec347: 0.7745 (centroid=0.6267)
vec11: 0.777 (centroid=0.5067)
vec340: 0.7858 (centroid=0.4752)
vec239: 0.7878 (centroid=0.4584)
vec704: 0.7916 (centroid=0.7117)
vec423: 0.7956 (centroid=0.4608)
vec220: 0.7957 (centroid=0.4226)
vec387: 0.8038 (centroid=0.4652)
vec637: 0.8039 (centroid=0.5211)
356 leaf vectors, 567 vectors, 97 full vectors, 103 partitions
vec771: 0.5624 (centroid=0.631)
vec356: 0.5976 (centroid=0.5046)
vec640: 0.6525 (centroid=0.6245)
vec302: 0.6601 (centroid=0.5159)
vec329: 0.6871 (centroid=0.5083)
vec95: 0.7008 (centroid=0.5551)
vec249: 0.7268 (centroid=0.4459)
vec386: 0.7301 (centroid=0.5489)
vec309: 0.7311 (centroid=0.5569)
vec633: 0.7513 (centroid=0.4747)
vec117: 0.7576 (centroid=0.5211)
vec556: 0.7595 (centroid=0.459)
vec25: 0.761 (centroid=0.4394)
vec776: 0.7633 (centroid=0.4892)
vec872: 0.7707 (centroid=0.5141)
vec859: 0.7708 (centroid=0.5757)
vec240: 0.7723 (centroid=0.5266)
vec347: 0.7745 (centroid=0.5297)
vec11: 0.777 (centroid=0.6306)
vec340: 0.7858 (centroid=0.5312)
vec239: 0.7878 (centroid=0.5127)
vec704: 0.7916 (centroid=0.5169)
vec423: 0.7956 (centroid=0.4941)
vec220: 0.7957 (centroid=0.4916)
vec848: 0.7958 (centroid=0.5294)
683 leaf vectors, 787 vectors, 100 full vectors, 74 partitions

# Test recall at different beam sizes.
recall topk=10 beam-size=4 samples=50
----
50.00% recall@10
44.26 leaf vectors, 75.42 vectors, 20.38 full vectors, 7.00 partitions

recall topk=10 beam-size=8 samples=50
----
53.60% recall@10
46.62 leaf vectors, 86.08 vectors, 20.18 full vectors, 15.00 partitions
70.40% recall@10
85.90 leaf vectors, 136.26 vectors, 24.44 full vectors, 13.00 partitions

recall topk=10 beam-size=16 samples=50
----
76.40% recall@10
94.02 leaf vectors, 168.58 vectors, 24.84 full vectors, 29.00 partitions
85.20% recall@10
169.94 leaf vectors, 263.62 vectors, 27.90 full vectors, 25.00 partitions

recall topk=10 beam-size=32 samples=50
----
91.80% recall@10
188.30 leaf vectors, 317.30 vectors, 28.52 full vectors, 55.00 partitions

recall topk=10 beam-size=64 samples=50
----
97.40% recall@10
371.40 leaf vectors, 585.00 vectors, 31.60 full vectors, 103.00 partitions
97.00% recall@10
336.46 leaf vectors, 440.46 vectors, 31.52 full vectors, 42.00 partitions
23 changes: 23 additions & 0 deletions pkg/sql/vecindex/vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/quantize"
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore"
"github.com/cockroachdb/cockroach/pkg/util/num32"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/vector"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -127,15 +128,21 @@ type VectorIndex struct {
// fixups runs index maintenance operations like split and merge on a
// background goroutine.
fixups fixupProcessor
// cancel stops the background fixup processing goroutine.
cancel func()
}

// NewVectorIndex constructs a new vector index instance. Typically, only one
// VectorIndex instance should be created for each index in the process.
// NOTE: If "stopper" is not nil, then the vector index will start a background
// goroutine to process index fixups. When the index is no longer needed, the
// caller must call Close to shut down the background goroutine.
func NewVectorIndex(
ctx context.Context,
store vecstore.Store,
quantizer quantize.Quantizer,
options *VectorIndexOptions,
stopper *stop.Stopper,
) (*VectorIndex, error) {
vi := &VectorIndex{
options: *options,
Expand All @@ -158,9 +165,25 @@ func NewVectorIndex(

vi.fixups.Init(vi, options.Seed)

if stopper != nil {
// Start the background goroutine.
ctx, vi.cancel = stopper.WithCancelOnQuiesce(ctx)
err := stopper.RunAsyncTask(ctx, "vecindex-fixups", vi.fixups.Start)
if err != nil {
return nil, errors.Wrap(err, "starting fixup processor")
}
}

return vi, nil
}

// Close cancels the background goroutine, if it's running.
func (vi *VectorIndex) Close() {
if vi.cancel != nil {
vi.cancel()
}
}

// CreateRoot creates an empty root partition in the store. This should only be
// called once when the index is first created.
func (vi *VectorIndex) CreateRoot(ctx context.Context, txn vecstore.Txn) error {
Expand Down
Loading

0 comments on commit 6da166b

Please sign in to comment.