Skip to content

Commit

Permalink
vecindex: enable background fixup processing
Browse files Browse the repository at this point in the history
The vector index now starts up a background goroutine that will process
split, merge, and other fixups for the index. A new testing command
validates the resulting index.

Epic: CRDB-42943

Release note: None
  • Loading branch information
andy-kimball committed Nov 15, 2024
1 parent 89df825 commit 6d9007c
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 43 deletions.
41 changes: 39 additions & 2 deletions pkg/sql/vecindex/fixup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package vecindex
import (
"context"
"math/rand"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/vecindex/internal"
Expand Down Expand Up @@ -87,6 +88,10 @@ type fixupProcessor struct {
// maxFixups limit has been reached.
fixupsLimitHit log.EveryN

// pendingCount tracks the number of pending fixups that still need to be
// processed.
pendingCount sync.WaitGroup

// --------------------------------------------------
// The following fields should only be accessed on a single background
// goroutine (or a single foreground goroutine in deterministic tests).
Expand Down Expand Up @@ -131,8 +136,34 @@ func (fp *fixupProcessor) AddSplit(
})
}

// runAll processes all fixups in the queue. This should only be called by
// tests on one foreground goroutine.
// Start is meant to be called on a background goroutine. It runs until the
// provided context is canceled, processing fixups as they are added to the
// fixup processor.
func (fp *fixupProcessor) Start(ctx context.Context) {
for {
// Wait to run the next fixup in the queue.
ok, err := fp.run(ctx, true /* wait */)
if err != nil {
// This is a background goroutine, so just log error and continue.
log.Errorf(ctx, "fixup processor error: %v", err)
continue
}

if !ok {
// Context was canceled, so exit.
return
}
}
}

// Wait blocks until all pending fixups have been processed by the background
// goroutine. This is useful in testing.
func (fp *fixupProcessor) Wait() {
fp.pendingCount.Wait()
}

// runAll processes all fixups in the queue. This should only be called by tests
// on one foreground goroutine, and only in cases where Start was not called.
func (fp *fixupProcessor) runAll(ctx context.Context) error {
for {
ok, err := fp.run(ctx, false /* wait */)
Expand Down Expand Up @@ -188,6 +219,9 @@ func (fp *fixupProcessor) run(ctx context.Context, wait bool) (ok bool, err erro
fp.mu.Lock()
defer fp.mu.Unlock()

// Decrement the number of pending fixups.
fp.pendingCount.Done()

switch next.Type {
case splitFixup:
key := partitionFixupKey{Type: next.Type, PartitionKey: next.PartitionKey}
Expand Down Expand Up @@ -222,6 +256,9 @@ func (fp *fixupProcessor) addFixup(ctx context.Context, fixup fixup) {
fp.mu.pendingPartitions[key] = true
}

// Increment the number of pending fixups.
fp.pendingCount.Add(1)

// Note that the channel send operation should never block, since it has
// maxFixups capacity.
fp.fixups <- fixup
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/vecindex/fixup_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestSplitPartitionData(t *testing.T) {
quantizer := quantize.NewRaBitQuantizer(2, 42)
store := vecstore.NewInMemoryStore(2, 42)
options := VectorIndexOptions{Seed: 42}
index, err := NewVectorIndex(ctx, store, quantizer, &options)
index, err := NewVectorIndex(ctx, store, quantizer, &options, nil /* stopper */)
require.NoError(t, err)

vectors := vector.MakeSetFromRawData([]float32{
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/vecindex/testdata/background-fixups.ddt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Load 1000 512-dimension features with background fixups enabled. Validate the
# resulting tree. Note that using background fixups means that the index build
# is non-deterministic, so there are limited validations we can do.

new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=500 background-fixups hide-tree
----
Created index with 500 vectors with 512 dimensions.

# Traverse the complete tree and ensure that all 500 vectors are present.
validate-tree
----
Validated index with 500 vectors.
72 changes: 36 additions & 36 deletions pkg/sql/vecindex/testdata/search-features.ddt
Original file line number Diff line number Diff line change
@@ -1,75 +1,75 @@
# Load 1000 512-dimension features and search them. Use small partition size to
# Load 500 512-dimension features and search them. Use small partition size to
# ensure a deeper tree.

new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=1000 hide-tree
new-index dims=512 min-partition-size=2 max-partition-size=8 quality-samples=4 beam-size=2 load-features=500 hide-tree
----
Created index with 1000 vectors with 512 dimensions.
Created index with 500 vectors with 512 dimensions.

# Start with 1 result and default beam size of 2.
search max-results=1 use-feature=5000
----
vec302: 0.6601 (centroid=0.4138)
14 leaf vectors, 33 vectors, 4 full vectors, 5 partitions
9 leaf vectors, 25 vectors, 3 full vectors, 5 partitions

# Search for additional results.
search max-results=6 use-feature=5000
----
vec302: 0.6601 (centroid=0.4138)
vec329: 0.6871 (centroid=0.5033)
vec386: 0.7301 (centroid=0.5117)
vec240: 0.7723 (centroid=0.4702)
vec347: 0.7745 (centroid=0.6267)
vec11: 0.777 (centroid=0.5067)
14 leaf vectors, 33 vectors, 10 full vectors, 5 partitions
vec208: 0.9026 (centroid=0.5818)
vec435: 0.9124 (centroid=0.7488)
vec223: 0.9145 (centroid=0.5778)
9 leaf vectors, 25 vectors, 9 full vectors, 5 partitions

# Use a larger beam size.
search max-results=6 use-feature=5000 beam-size=8
----
vec771: 0.5624 (centroid=0.4676)
vec302: 0.6601 (centroid=0.4138)
vec329: 0.6871 (centroid=0.5033)
vec386: 0.7301 (centroid=0.5117)
vec240: 0.7723 (centroid=0.4702)
vec329: 0.6871 (centroid=0.4672)
vec386: 0.7301 (centroid=0.6006)
vec309: 0.7311 (centroid=0.4039)
vec240: 0.7723 (centroid=0.5003)
vec347: 0.7745 (centroid=0.6267)
50 leaf vectors, 91 vectors, 12 full vectors, 15 partitions
46 leaf vectors, 81 vectors, 22 full vectors, 15 partitions

# Turn off re-ranking, which results in increased inaccuracy.
search max-results=6 use-feature=5000 beam-size=8 skip-rerank
----
vec771: 0.5499 ±0.0291 (centroid=0.4676)
vec302: 0.6246 ±0.0274 (centroid=0.4138)
vec329: 0.6609 ±0.0333 (centroid=0.5033)
vec386: 0.7245 ±0.0338 (centroid=0.5117)
vec329: 0.6501 ±0.031 (centroid=0.4672)
vec386: 0.7132 ±0.0398 (centroid=0.6006)
vec347: 0.7279 ±0.0415 (centroid=0.6267)
vec309: 0.73 ±0.0273 (centroid=0.4039)
vec11: 0.7509 ±0.0336 (centroid=0.5067)
50 leaf vectors, 91 vectors, 0 full vectors, 15 partitions
46 leaf vectors, 81 vectors, 0 full vectors, 15 partitions

# Return top 25 results with large beam size.
search max-results=25 use-feature=5000 beam-size=64
----
vec771: 0.5624 (centroid=0.4676)
vec356: 0.5976 (centroid=0.5117)
vec640: 0.6525 (centroid=0.6139)
vec356: 0.5976 (centroid=0.5821)
vec302: 0.6601 (centroid=0.4138)
vec329: 0.6871 (centroid=0.5033)
vec329: 0.6871 (centroid=0.4672)
vec95: 0.7008 (centroid=0.5542)
vec249: 0.7268 (centroid=0.3715)
vec386: 0.7301 (centroid=0.5117)
vec309: 0.7311 (centroid=0.4912)
vec633: 0.7513 (centroid=0.4095)
vec249: 0.7268 (centroid=0.4042)
vec386: 0.7301 (centroid=0.6006)
vec309: 0.7311 (centroid=0.4039)
vec117: 0.7576 (centroid=0.4538)
vec556: 0.7595 (centroid=0.5531)
vec25: 0.761 (centroid=0.4576)
vec872: 0.7707 (centroid=0.6427)
vec859: 0.7708 (centroid=0.6614)
vec240: 0.7723 (centroid=0.4702)
vec240: 0.7723 (centroid=0.5003)
vec347: 0.7745 (centroid=0.6267)
vec11: 0.777 (centroid=0.5067)
vec340: 0.7858 (centroid=0.4752)
vec239: 0.7878 (centroid=0.4584)
vec704: 0.7916 (centroid=0.7117)
vec423: 0.7956 (centroid=0.4608)
vec340: 0.7858 (centroid=0.5942)
vec239: 0.7878 (centroid=0.4654)
vec423: 0.7956 (centroid=0.6041)
vec220: 0.7957 (centroid=0.4226)
vec387: 0.8038 (centroid=0.4652)
vec637: 0.8039 (centroid=0.5211)
356 leaf vectors, 567 vectors, 97 full vectors, 103 partitions
vec387: 0.8038 (centroid=0.4954)
vec410: 0.8062 (centroid=0.4643)
vec52: 0.8068 (centroid=0.5793)
vec379: 0.8082 (centroid=0.3691)
vec457: 0.8084 (centroid=0.5842)
vec184: 0.8139 (centroid=0.4777)
vec246: 0.8141 (centroid=0.5273)
vec493: 0.8184 (centroid=0.6874)
vec129: 0.8211 (centroid=0.432)
338 leaf vectors, 453 vectors, 78 full vectors, 85 partitions
23 changes: 23 additions & 0 deletions pkg/sql/vecindex/vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/quantize"
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore"
"github.com/cockroachdb/cockroach/pkg/util/num32"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/vector"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -127,15 +128,21 @@ type VectorIndex struct {
// fixups runs index maintenance operations like split and merge on a
// background goroutine.
fixups fixupProcessor
// cancel stops the background fixup processing goroutine.
cancel func()
}

// NewVectorIndex constructs a new vector index instance. Typically, only one
// VectorIndex instance should be created for each index in the process.
// NOTE: If "stopper" is not nil, then the vector index will start a background
// goroutine to process index fixups. When the index is no longer needed, the
// caller must call Close to shut down the background goroutine.
func NewVectorIndex(
ctx context.Context,
store vecstore.Store,
quantizer quantize.Quantizer,
options *VectorIndexOptions,
stopper *stop.Stopper,
) (*VectorIndex, error) {
vi := &VectorIndex{
options: *options,
Expand All @@ -158,9 +165,25 @@ func NewVectorIndex(

vi.fixups.Init(vi, options.Seed)

if stopper != nil {
// Start the background goroutine.
ctx, vi.cancel = stopper.WithCancelOnQuiesce(ctx)
err := stopper.RunAsyncTask(ctx, "vecindex-fixups", vi.fixups.Start)
if err != nil {
return nil, errors.Wrap(err, "starting fixup processor")
}
}

return vi, nil
}

// Close cancels the background goroutine, if it's running.
func (vi *VectorIndex) Close() {
if vi.cancel != nil {
vi.cancel()
}
}

// CreateRoot creates an empty root partition in the store. This should only be
// called once when the index is first created.
func (vi *VectorIndex) CreateRoot(ctx context.Context, txn vecstore.Txn) error {
Expand Down
Loading

0 comments on commit 6d9007c

Please sign in to comment.