Skip to content
This repository has been archived by the owner on Feb 18, 2025. It is now read-only.

Commit

Permalink
feat: improve the throttle threshold when syncing
Browse files Browse the repository at this point in the history
Currently, the new threshold is calculated when fetching the result from result
queue and is applied to the next reserve/fetch result. However, the result
processing may take too much time when data is large, the threshold is not
updated early enough to slow down further reserve/fetch. As a result, the memory
can peak so high. In this commit, we re-calculate threshold when handling
response. In case the result queue is throttled, we discard the response and
queue the fetch task back to queue for later re-fetch. This may increase the
cost in network bandwidth but help to regulate the memory usage.
  • Loading branch information
minh-bq committed May 18, 2023
1 parent a095527 commit 977d580
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 70 deletions.
70 changes: 41 additions & 29 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,33 +363,6 @@ func (q *queue) Results(block bool) []*fetchResult {
}
// Regardless if closed or not, we can still deliver whatever we have
results := q.resultCache.GetCompleted(maxResultsProcess)
for _, result := range results {
// Recalculate the result item weights to prevent memory exhaustion
size := result.Header.Size()
for _, uncle := range result.Uncles {
size += uncle.Size()
}
for _, receipt := range result.Receipts {
size += receipt.Size()
}
for _, tx := range result.Transactions {
size += tx.Size()
}
q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
}
// Using the newly calibrated resultsize, figure out the new throttle limit
// on the result cache
throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)

// Log some info at certain times
if time.Since(q.lastStatLog) > 60*time.Second {
q.lastStatLog = time.Now()
info := q.Stats()
info = append(info, "throttle", throttleThreshold)
log.Info("Downloader queue stats", info...)
}
return results
}

Expand Down Expand Up @@ -824,6 +797,21 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int,
receiptReqTimer, len(receiptList), validate, reconstruct)
}

func (q *queue) calculateResultSize(result *fetchResult) {
size := result.Header.Size()
for _, uncle := range result.Uncles {
size += uncle.Size()
}
for _, receipt := range result.Receipts {
size += receipt.Size()
}
for _, tx := range result.Transactions {
size += tx.Size()
}
q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
}

// deliver injects a data retrieval response into the results queue.
//
// Note, this method expects the queue lock to be already held for writing. The
Expand Down Expand Up @@ -870,8 +858,19 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
}

for _, header := range request.Headers[:i] {
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil {
reconstruct(accepted, res)
if res, stale, throttle, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil {
if throttle {
log.Info(
"The downloader queue is throttled",
"threshold", q.resultCache.throttleThreshold,
"itemSize", q.resultSize,
"drop block from", header.Number.Uint64(),
)
break
} else {
reconstruct(accepted, res)
q.calculateResultSize(res)
}
} else {
// else: betweeen here and above, some other peer filled this result,
// or it was indeed a no-op. This should not happen, but if it does it's
Expand All @@ -883,6 +882,19 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
delete(taskPool, hashes[accepted])
accepted++
}
// Using the newly calibrated resultsize, figure out the new throttle limit
// on the result cache
throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)

// Log some info at certain times
if time.Since(q.lastStatLog) > 60*time.Second {
q.lastStatLog = time.Now()
info := q.stats()
info = append(info, "throttle", throttleThreshold)
log.Info("Downloader queue stats", info...)
}

// Return all failed or missing fetches to the queue
for _, header := range request.Headers[accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
Expand Down
10 changes: 4 additions & 6 deletions eth/downloader/resultstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,13 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro
}

// GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag
// is true, that means the header has already been delivered 'upstream'. This method
// does not bubble up the 'throttle' flag, since it's moot at the point in time when
// the item is downloaded and ready for delivery
func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) {
// is true, that means the header has already been delivered 'upstream'.
func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, bool, error) {
r.lock.RLock()
defer r.lock.RUnlock()

res, _, stale, _, err := r.getFetchResult(headerNumber)
return res, stale, err
res, _, stale, throttle, err := r.getFetchResult(headerNumber)
return res, stale, throttle, err
}

// getFetchResult returns the fetchResult corresponding to the given item, and
Expand Down
70 changes: 41 additions & 29 deletions les/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,33 +363,6 @@ func (q *queue) Results(block bool) []*fetchResult {
}
// Regardless if closed or not, we can still deliver whatever we have
results := q.resultCache.GetCompleted(maxResultsProcess)
for _, result := range results {
// Recalculate the result item weights to prevent memory exhaustion
size := result.Header.Size()
for _, uncle := range result.Uncles {
size += uncle.Size()
}
for _, receipt := range result.Receipts {
size += receipt.Size()
}
for _, tx := range result.Transactions {
size += tx.Size()
}
q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
}
// Using the newly calibrated resultsize, figure out the new throttle limit
// on the result cache
throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)

// Log some info at certain times
if time.Since(q.lastStatLog) > 60*time.Second {
q.lastStatLog = time.Now()
info := q.Stats()
info = append(info, "throttle", throttleThreshold)
log.Info("Downloader queue stats", info...)
}
return results
}

Expand Down Expand Up @@ -824,6 +797,21 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int,
receiptReqTimer, len(receiptList), validate, reconstruct)
}

func (q *queue) calculateResultSize(result *fetchResult) {
size := result.Header.Size()
for _, uncle := range result.Uncles {
size += uncle.Size()
}
for _, receipt := range result.Receipts {
size += receipt.Size()
}
for _, tx := range result.Transactions {
size += tx.Size()
}
q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
}

// deliver injects a data retrieval response into the results queue.
//
// Note, this method expects the queue lock to be already held for writing. The
Expand Down Expand Up @@ -870,8 +858,19 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
}

for _, header := range request.Headers[:i] {
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil {
reconstruct(accepted, res)
if res, stale, throttle, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil {
if throttle {
log.Info(
"The downloader queue is throttled",
"threshold", q.resultCache.throttleThreshold,
"itemSize", q.resultSize,
"drop block from", header.Number.Uint64(),
)
break
} else {
reconstruct(accepted, res)
q.calculateResultSize(res)
}
} else {
// else: betweeen here and above, some other peer filled this result,
// or it was indeed a no-op. This should not happen, but if it does it's
Expand All @@ -883,6 +882,19 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
delete(taskPool, hashes[accepted])
accepted++
}
// Using the newly calibrated resultsize, figure out the new throttle limit
// on the result cache
throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)

// Log some info at certain times
if time.Since(q.lastStatLog) > 60*time.Second {
q.lastStatLog = time.Now()
info := q.stats()
info = append(info, "throttle", throttleThreshold)
log.Info("Downloader queue stats", info...)
}

// Return all failed or missing fetches to the queue
for _, header := range request.Headers[accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
Expand Down
10 changes: 4 additions & 6 deletions les/downloader/resultstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,13 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro
}

// GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag
// is true, that means the header has already been delivered 'upstream'. This method
// does not bubble up the 'throttle' flag, since it's moot at the point in time when
// the item is downloaded and ready for delivery
func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) {
// is true, that means the header has already been delivered 'upstream'.
func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, bool, error) {
r.lock.RLock()
defer r.lock.RUnlock()

res, _, stale, _, err := r.getFetchResult(headerNumber)
return res, stale, err
res, _, stale, throttle, err := r.getFetchResult(headerNumber)
return res, stale, throttle, err
}

// getFetchResult returns the fetchResult corresponding to the given item, and
Expand Down

0 comments on commit 977d580

Please sign in to comment.