Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: batch id prefix for batchradius #4235

Merged
merged 8 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 59 additions & 67 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,94 +471,79 @@ func (c *Cache) MoveFromReserve(
return fmt.Errorf("failed creating batch: %w", err)
}

state := &cacheState{}
err = store.IndexStore().Get(state)
if err != nil {
return fmt.Errorf("failed reading cache state: %w", err)
}

newHead := &cacheEntry{Address: state.Head}
newTail := &cacheEntry{Address: state.Tail}

// if newTail is not found, this is a new cache and we need to set the
// tail and head
err = store.IndexStore().Get(newTail)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("failed getting tail entry %s: %w", newTail, err)
}

// this is a corner case where cache capacity is less than the no of items
// being added. In this case we need to only add the last c.capacity items
// from the addrs slice. This would generally happen in tests.
if len(addrs) > int(c.capacity) {
addrs = addrs[len(addrs)-int(c.capacity):]
}

var count uint64
entriesToAdd := make([]*cacheEntry, 0, len(addrs))
for _, addr := range addrs {
newEntry := &cacheEntry{Address: addr}
if has, err := store.IndexStore().Has(newEntry); err == nil && has {
entry := &cacheEntry{Address: addr}
if has, err := store.IndexStore().Has(entry); err == nil && has {
continue
}

if !newTail.Address.IsZero() {
newTail.Next = addr.Clone()
newEntry.Prev = newTail.Address.Clone()
err = batch.Put(newTail)
if err != nil {
return fmt.Errorf("failed updating new tail entry %s: %w", newTail, err)
}
} else {
// if we are here, it means its a fresh cache and we need to set
// the head and tail
newHead = newEntry
if len(entriesToAdd) > 0 {
entriesToAdd[len(entriesToAdd)-1].Next = addr.Clone()
entry.Prev = entriesToAdd[len(entriesToAdd)-1].Address.Clone()
}
newTail = newEntry
count++
entriesToAdd = append(entriesToAdd, entry)
}

if count == 0 {
if len(entriesToAdd) == 0 {
return nil
}

// last entry after the loop is the new tail
err = batch.Put(newTail)
state := &cacheState{}
err = store.IndexStore().Get(state)
if err != nil {
return fmt.Errorf("failed updating new tail entry %s: %w", newTail, err)
return fmt.Errorf("failed reading cache state: %w", err)
}

var itemsToRemove uint64
if state.Count+count > c.capacity {
itemsToRemove = state.Count + count - c.capacity
if len(entriesToAdd) > int(c.capacity) {
entriesToAdd = entriesToAdd[len(entriesToAdd)-int(c.capacity):]
}

if itemsToRemove > 0 {
for i := 0; i < int(itemsToRemove); i++ {
err = store.IndexStore().Get(newHead)
if err != nil {
return fmt.Errorf("failed getting head entry %s: %w", newHead, err)
}
err = batch.Delete(newHead)
if err != nil {
return fmt.Errorf("failed deleting head entry %s: %w", newHead, err)
entriesToRemove := make([]*cacheEntry, 0, len(entriesToAdd))
if state.Count == 0 {
// this means that the cache is empty and we need to set the head and
// tail.
state.Head = entriesToAdd[0].Address.Clone()
state.Tail = entriesToAdd[len(entriesToAdd)-1].Address.Clone()
} else {
state.Tail = entriesToAdd[len(entriesToAdd)-1].Address.Clone()
if state.Count+uint64(len(entriesToAdd)) > c.capacity {
// this means that we need to remove some entries from the cache. The cache
// is kept at capacity, so we need to remove the first entries that were
// added to the cache.
removeItemCount := int(state.Count + uint64(len(entriesToAdd)) - c.capacity)
for i := 0; i < removeItemCount; i++ {
entry := &cacheEntry{Address: state.Head}
err = store.IndexStore().Get(entry)
if err != nil {
return fmt.Errorf("failed getting entry %s: %w", entry, err)
}
entriesToRemove = append(entriesToRemove, entry)
state.Head = entry.Next.Clone()
}
err = store.ChunkStore().Delete(ctx, newHead.Address)
if err != nil {
return fmt.Errorf("failed deleting chunk %s: %w", newHead.Address, err)
// if we removed all the entries from the cache, we need to set the head
// to the first item that we are adding. This is because the old tail Next
// is pointing to either nil or some incorrect address.
if removeItemCount == int(state.Count) {
state.Head = entriesToAdd[0].Address.Clone()
}
newHead = &cacheEntry{Address: newHead.Next}
}
// this is again a corner case where the cache capacity no. of items
// are being added, so the newHead is the first item in the slice.
if newHead.Address.IsZero() {
newHead = &cacheEntry{Address: addrs[0]}
}

for _, entry := range entriesToAdd {
err = batch.Put(entry)
if err != nil {
return fmt.Errorf("failed adding entry %s: %w", entry, err)
}
}

state.Tail = newTail.Address.Clone()
state.Head = newHead.Address.Clone()
state.Count += (count - itemsToRemove)
for _, entry := range entriesToRemove {
err = batch.Delete(entry)
if err != nil {
return fmt.Errorf("failed deleting entry %s: %w", entry, err)
}
}

state.Count += uint64(len(entriesToAdd)) - uint64(len(entriesToRemove))
err = batch.Put(state)
if err != nil {
return fmt.Errorf("failed updating state %s: %w", state, err)
Expand All @@ -568,6 +553,13 @@ func (c *Cache) MoveFromReserve(
return fmt.Errorf("batch commit: %w", err)
}

for _, entry := range entriesToRemove {
err = store.ChunkStore().Delete(ctx, entry.Address)
if err != nil {
return fmt.Errorf("failed deleting chunk %s: %w", entry.Address, err)
}
}

c.mtx.Lock()
c.size = state.Count
c.mtx.Unlock()
Expand Down
8 changes: 2 additions & 6 deletions pkg/storer/internal/reserve/items.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@ func (b *batchRadiusItem) Namespace() string {
return "batchRadius"
}

// bin/batchID/ChunkAddr
// batchID/bin/ChunkAddr
func (b *batchRadiusItem) ID() string {
return batchBinToString(b.Bin, b.BatchID) + b.Address.ByteString()
}

func batchBinToString(bin uint8, batchID []byte) string {
return string(bin) + string(batchID)
return string(b.BatchID) + string(b.Bin) + b.Address.ByteString()
}

func (b *batchRadiusItem) String() string {
Expand Down
13 changes: 9 additions & 4 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb
return err
}

// EvictBatchBin evicts all chunks from the bin provided.
// EvictBatchBin evicts all chunks from bins upto the bin provided.
func (r *Reserve) EvictBatchBin(
ctx context.Context,
txExecutor internal.TxExecutor,
Expand All @@ -303,9 +303,12 @@ func (r *Reserve) EvictBatchBin(
err := txExecutor.Execute(ctx, func(store internal.Storage) error {
return store.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return &batchRadiusItem{} },
Prefix: batchBinToString(bin, batchID),
Prefix: string(batchID),
}, func(res storage.Result) (bool, error) {
batchRadius := res.Entry.(*batchRadiusItem)
if batchRadius.Bin >= bin {
return true, nil
}
evicted = append(evicted, batchRadius)
return false, nil
})
Expand All @@ -315,6 +318,7 @@ func (r *Reserve) EvictBatchBin(
}

batchCnt := 1000
evictionCompleted := 0

for i := 0; i < len(evicted); i += batchCnt {
end := i + batchCnt
Expand Down Expand Up @@ -343,11 +347,12 @@ func (r *Reserve) EvictBatchBin(
return batch.Commit()
})
if err != nil {
return 0, err
return evictionCompleted, err
}
evictionCompleted += end - i
}

return len(evicted), nil
return evictionCompleted, nil
}

func (r *Reserve) DeleteChunk(
Expand Down
44 changes: 10 additions & 34 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const (
cleanupDur = time.Hour * 6
)

func reserveUpdateBatchLockKey(batchID []byte, bin uint8) string {
return fmt.Sprintf("%s%s%d", reserveUpdateLockKey, string(batchID), bin)
func reserveUpdateBatchLockKey(batchID []byte) string {
return fmt.Sprintf("%s%s", reserveUpdateLockKey, string(batchID))
}

var errMaxRadius = errors.New("max radius reached")
Expand Down Expand Up @@ -352,10 +352,7 @@ func (db *DB) ReservePutter() storage.Putter {
var (
newIndex bool
)
lockKey := reserveUpdateBatchLockKey(
chunk.Stamp().BatchID(),
db.po(chunk.Address()),
)
lockKey := reserveUpdateBatchLockKey(chunk.Stamp().BatchID())
db.lock.Lock(lockKey)
err = db.Execute(ctx, func(tx internal.Storage) error {
newIndex, err = db.reserve.Put(ctx, tx, chunk)
Expand Down Expand Up @@ -464,41 +461,20 @@ func (db *DB) evictBatch(
} else {
db.metrics.EvictedChunkCount.Add(float64(evicted))
}
}()

for b := uint8(0); b < upToBin; b++ {

select {
case <-ctx.Done():
return evicted, ctx.Err()
default:
}

binEvicted, err := func() (int, error) {
lockKey := reserveUpdateBatchLockKey(batchID, b)
db.lock.Lock(lockKey)
defer db.lock.Unlock(lockKey)

return db.reserve.EvictBatchBin(ctx, db, b, batchID)
}()
evicted += binEvicted

// if there was an error, we still need to update the chunks that have already
// been evicted from the reserve
db.logger.Debug(
"reserve eviction",
"bin", b,
"evicted", binEvicted,
"uptoBin", upToBin,
"evicted", evicted,
"batchID", hex.EncodeToString(batchID),
"new_size", db.reserve.Size(),
)
}()

if err != nil {
return evicted, err
}
}
lockKey := reserveUpdateBatchLockKey(batchID)
db.lock.Lock(lockKey)
defer db.lock.Unlock(lockKey)

return evicted, nil
return db.reserve.EvictBatchBin(ctx, db, upToBin, batchID)
}

func (db *DB) reserveCleanup(ctx context.Context) error {
Expand Down
27 changes: 12 additions & 15 deletions pkg/storer/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,8 @@ func TestUnreserveCap(t *testing.T) {

putter := storer.ReservePutter()

gotUnreserveSignal := make(chan struct{})
go func() {
defer close(gotUnreserveSignal)
c, unsub := storer.Events().Subscribe("reserveUnreserved")
defer unsub()
<-c
}()
c, unsub := storer.Events().Subscribe("reserveUnreserved")
defer unsub()

for b := 0; b < 5; b++ {
for i := uint64(0); i < chunksPerPO; i++ {
Expand All @@ -292,14 +287,16 @@ func TestUnreserveCap(t *testing.T) {
}
}

// wait for unreserve signal
<-gotUnreserveSignal

err = spinlock.Wait(time.Second*45, func() bool {
return storer.ReserveSize() == capacity
})
if err != nil {
t.Fatal(err)
done:
for {
select {
case <-c:
if storer.ReserveSize() == capacity {
break done
}
case <-time.After(time.Second * 45):
t.Fatal("timeout waiting for reserve to reach capacity")
}
}

for po, chunks := range chunksPO {
Expand Down
1 change: 1 addition & 0 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
return nil, err
}
}

repo, dbCloser, err = initDiskRepository(ctx, dirPath, opts)
if err != nil {
return nil, err
Expand Down
Loading