Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve eviction worker cpu use #4234

Merged
merged 3 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 59 additions & 67 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,94 +471,79 @@ func (c *Cache) MoveFromReserve(
return fmt.Errorf("failed creating batch: %w", err)
}

state := &cacheState{}
err = store.IndexStore().Get(state)
if err != nil {
return fmt.Errorf("failed reading cache state: %w", err)
}

newHead := &cacheEntry{Address: state.Head}
newTail := &cacheEntry{Address: state.Tail}

// if newTail is not found, this is a new cache and we need to set the
// tail and head
err = store.IndexStore().Get(newTail)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("failed getting tail entry %s: %w", newTail, err)
}

// this is a corner case where cache capacity is less than the no of items
// being added. In this case we need to only add the last c.capacity items
// from the addrs slice. This would generally happen in tests.
if len(addrs) > int(c.capacity) {
addrs = addrs[len(addrs)-int(c.capacity):]
}

var count uint64
entriesToAdd := make([]*cacheEntry, 0, len(addrs))
for _, addr := range addrs {
newEntry := &cacheEntry{Address: addr}
if has, err := store.IndexStore().Has(newEntry); err == nil && has {
entry := &cacheEntry{Address: addr}
if has, err := store.IndexStore().Has(entry); err == nil && has {
continue
}

if !newTail.Address.IsZero() {
newTail.Next = addr.Clone()
newEntry.Prev = newTail.Address.Clone()
err = batch.Put(newTail)
if err != nil {
return fmt.Errorf("failed updating new tail entry %s: %w", newTail, err)
}
} else {
// if we are here, it means its a fresh cache and we need to set
// the head and tail
newHead = newEntry
if len(entriesToAdd) > 0 {
entriesToAdd[len(entriesToAdd)-1].Next = addr.Clone()
entry.Prev = entriesToAdd[len(entriesToAdd)-1].Address.Clone()
}
newTail = newEntry
count++
entriesToAdd = append(entriesToAdd, entry)
}

if count == 0 {
if len(entriesToAdd) == 0 {
return nil
}

// last entry after the loop is the new tail
err = batch.Put(newTail)
state := &cacheState{}
err = store.IndexStore().Get(state)
if err != nil {
return fmt.Errorf("failed updating new tail entry %s: %w", newTail, err)
return fmt.Errorf("failed reading cache state: %w", err)
}

var itemsToRemove uint64
if state.Count+count > c.capacity {
itemsToRemove = state.Count + count - c.capacity
if len(entriesToAdd) > int(c.capacity) {
entriesToAdd = entriesToAdd[len(entriesToAdd)-int(c.capacity):]
}

if itemsToRemove > 0 {
for i := 0; i < int(itemsToRemove); i++ {
err = store.IndexStore().Get(newHead)
if err != nil {
return fmt.Errorf("failed getting head entry %s: %w", newHead, err)
}
err = batch.Delete(newHead)
if err != nil {
return fmt.Errorf("failed deleting head entry %s: %w", newHead, err)
entriesToRemove := make([]*cacheEntry, 0, len(entriesToAdd))
if state.Count == 0 {
// this means that the cache is empty and we need to set the head and
// tail.
state.Head = entriesToAdd[0].Address.Clone()
state.Tail = entriesToAdd[len(entriesToAdd)-1].Address.Clone()
} else {
state.Tail = entriesToAdd[len(entriesToAdd)-1].Address.Clone()
if state.Count+uint64(len(entriesToAdd)) > c.capacity {
// this means that we need to remove some entries from the cache. The cache
// is kept at capacity, so we need to remove the first entries that were
// added to the cache.
removeItemCount := int(state.Count + uint64(len(entriesToAdd)) - c.capacity)
for i := 0; i < removeItemCount; i++ {
entry := &cacheEntry{Address: state.Head}
err = store.IndexStore().Get(entry)
if err != nil {
return fmt.Errorf("failed getting entry %s: %w", entry, err)
}
entriesToRemove = append(entriesToRemove, entry)
state.Head = entry.Next.Clone()
}
err = store.ChunkStore().Delete(ctx, newHead.Address)
if err != nil {
return fmt.Errorf("failed deleting chunk %s: %w", newHead.Address, err)
// if we removed all the entries from the cache, we need to set the head
// to the first item that we are adding. This is because the old tail Next
// is pointing to either nil or some incorrect address.
if removeItemCount == int(state.Count) {
state.Head = entriesToAdd[0].Address.Clone()
}
newHead = &cacheEntry{Address: newHead.Next}
}
// this is again a corner case where the cache capacity no. of items
// are being added, so the newHead is the first item in the slice.
if newHead.Address.IsZero() {
newHead = &cacheEntry{Address: addrs[0]}
}

for _, entry := range entriesToAdd {
err = batch.Put(entry)
if err != nil {
return fmt.Errorf("failed adding entry %s: %w", entry, err)
}
}

state.Tail = newTail.Address.Clone()
state.Head = newHead.Address.Clone()
state.Count += (count - itemsToRemove)
for _, entry := range entriesToRemove {
err = batch.Delete(entry)
if err != nil {
return fmt.Errorf("failed deleting entry %s: %w", entry, err)
}
}

state.Count += uint64(len(entriesToAdd)) - uint64(len(entriesToRemove))
err = batch.Put(state)
if err != nil {
return fmt.Errorf("failed updating state %s: %w", state, err)
Expand All @@ -568,6 +553,13 @@ func (c *Cache) MoveFromReserve(
return fmt.Errorf("batch commit: %w", err)
}

for _, entry := range entriesToRemove {
err = store.ChunkStore().Delete(ctx, entry.Address)
if err != nil {
return fmt.Errorf("failed deleting chunk %s: %w", entry.Address, err)
}
}

c.mtx.Lock()
c.size = state.Count
c.mtx.Unlock()
Expand Down
8 changes: 2 additions & 6 deletions pkg/storer/internal/reserve/items.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@ func (b *batchRadiusItem) Namespace() string {
return "batchRadius"
}

// bin/batchID/ChunkAddr
// batchID/bin/ChunkAddr
func (b *batchRadiusItem) ID() string {
return batchBinToString(b.Bin, b.BatchID) + b.Address.ByteString()
}

func batchBinToString(bin uint8, batchID []byte) string {
return string(bin) + string(batchID)
return string(b.BatchID) + string(b.Bin) + b.Address.ByteString()
}

func (b *batchRadiusItem) String() string {
Expand Down
22 changes: 12 additions & 10 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb
return err
}

// EvictBatchBin evicts all chunks from the bin provided.
// EvictBatchBin evicts all chunks from bins upto the bin provided.
func (r *Reserve) EvictBatchBin(
ctx context.Context,
txExecutor internal.TxExecutor,
Expand All @@ -303,9 +303,12 @@ func (r *Reserve) EvictBatchBin(
err := txExecutor.Execute(ctx, func(store internal.Storage) error {
return store.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return &batchRadiusItem{} },
Prefix: batchBinToString(bin, batchID),
Prefix: string(batchID),
}, func(res storage.Result) (bool, error) {
batchRadius := res.Entry.(*batchRadiusItem)
if batchRadius.Bin >= bin {
return true, nil
}
evicted = append(evicted, batchRadius)
return false, nil
})
Expand All @@ -315,6 +318,7 @@ func (r *Reserve) EvictBatchBin(
}

batchCnt := 1000
evictionCompleted := 0

for i := 0; i < len(evicted); i += batchCnt {
end := i + batchCnt
Expand All @@ -337,20 +341,18 @@ func (r *Reserve) EvictBatchBin(
}
moveToCache = append(moveToCache, item.Address)
}
if err := r.cacheCb(ctx, store, moveToCache...); err != nil {
return err
}
return batch.Commit()
})
if err != nil {
return 0, err
}
err = txExecutor.Execute(ctx, func(store internal.Storage) error {
return r.cacheCb(ctx, store, moveToCache...)
})
if err != nil {
return 0, err
return evictionCompleted, err
}
evictionCompleted += end - i
}

return len(evicted), nil
return evictionCompleted, nil
}

func (r *Reserve) DeleteChunk(
Expand Down
Loading
Loading