diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 04ec12cfa9..0250a46cfc 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -363,33 +363,6 @@ func (q *queue) Results(block bool) []*fetchResult { } // Regardless if closed or not, we can still deliver whatever we have results := q.resultCache.GetCompleted(maxResultsProcess) - for _, result := range results { - // Recalculate the result item weights to prevent memory exhaustion - size := result.Header.Size() - for _, uncle := range result.Uncles { - size += uncle.Size() - } - for _, receipt := range result.Receipts { - size += receipt.Size() - } - for _, tx := range result.Transactions { - size += tx.Size() - } - q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + - (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize - } - // Using the newly calibrated resultsize, figure out the new throttle limit - // on the result cache - throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) - throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold) - - // Log some info at certain times - if time.Since(q.lastStatLog) > 60*time.Second { - q.lastStatLog = time.Now() - info := q.Stats() - info = append(info, "throttle", throttleThreshold) - log.Info("Downloader queue stats", info...) - } return results } @@ -824,6 +797,21 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, receiptReqTimer, len(receiptList), validate, reconstruct) } +func (q *queue) calculateResultSize(result *fetchResult) { + size := result.Header.Size() + for _, uncle := range result.Uncles { + size += uncle.Size() + } + for _, receipt := range result.Receipts { + size += receipt.Size() + } + for _, tx := range result.Transactions { + size += tx.Size() + } + q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize +} + // deliver injects a data retrieval response into the results queue. // // Note, this method expects the queue lock to be already held for writing. The @@ -870,8 +858,19 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, } for _, header := range request.Headers[:i] { - if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil { - reconstruct(accepted, res) + if res, stale, throttle, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil { + if throttle { + log.Info( + "The downloader queue is throttled", + "threshold", q.resultCache.throttleThreshold, + "itemSize", q.resultSize, + "drop block from", header.Number.Uint64(), + ) + break + } else { + reconstruct(accepted, res) + q.calculateResultSize(res) + } } else { // else: betweeen here and above, some other peer filled this result, // or it was indeed a no-op. This should not happen, but if it does it's @@ -883,6 +882,19 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, delete(taskPool, hashes[accepted]) accepted++ } + // Using the newly calibrated resultsize, figure out the new throttle limit + // on the result cache + throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) + throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold) + + // Log some info at certain times + if time.Since(q.lastStatLog) > 60*time.Second { + q.lastStatLog = time.Now() + info := q.stats() + info = append(info, "throttle", throttleThreshold) + log.Info("Downloader queue stats", info...) + } + // Return all failed or missing fetches to the queue for _, header := range request.Headers[accepted:] { taskQueue.Push(header, -int64(header.Number.Uint64())) diff --git a/eth/downloader/resultstore.go b/eth/downloader/resultstore.go index 21928c2a00..ffea3eb10c 100644 --- a/eth/downloader/resultstore.go +++ b/eth/downloader/resultstore.go @@ -92,15 +92,13 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro } // GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag -// is true, that means the header has already been delivered 'upstream'. This method -// does not bubble up the 'throttle' flag, since it's moot at the point in time when -// the item is downloaded and ready for delivery -func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) { +// is true, that means the header has already been delivered 'upstream'. +func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, bool, error) { r.lock.RLock() defer r.lock.RUnlock() - res, _, stale, _, err := r.getFetchResult(headerNumber) - return res, stale, err + res, _, stale, throttle, err := r.getFetchResult(headerNumber) + return res, stale, throttle, err } // getFetchResult returns the fetchResult corresponding to the given item, and diff --git a/les/downloader/queue.go b/les/downloader/queue.go index 04ec12cfa9..0250a46cfc 100644 --- a/les/downloader/queue.go +++ b/les/downloader/queue.go @@ -363,33 +363,6 @@ func (q *queue) Results(block bool) []*fetchResult { } // Regardless if closed or not, we can still deliver whatever we have results := q.resultCache.GetCompleted(maxResultsProcess) - for _, result := range results { - // Recalculate the result item weights to prevent memory exhaustion - size := result.Header.Size() - for _, uncle := range result.Uncles { - size += uncle.Size() - } - for _, receipt := range result.Receipts { - size += receipt.Size() - } - for _, tx := range result.Transactions { - size += tx.Size() - } - q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + - (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize - } - // Using the newly calibrated resultsize, figure out the new throttle limit - // on the result cache - throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) - throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold) - - // Log some info at certain times - if time.Since(q.lastStatLog) > 60*time.Second { - q.lastStatLog = time.Now() - info := q.Stats() - info = append(info, "throttle", throttleThreshold) - log.Info("Downloader queue stats", info...) - } return results } @@ -824,6 +797,21 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, receiptReqTimer, len(receiptList), validate, reconstruct) } +func (q *queue) calculateResultSize(result *fetchResult) { + size := result.Header.Size() + for _, uncle := range result.Uncles { + size += uncle.Size() + } + for _, receipt := range result.Receipts { + size += receipt.Size() + } + for _, tx := range result.Transactions { + size += tx.Size() + } + q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize +} + // deliver injects a data retrieval response into the results queue. // // Note, this method expects the queue lock to be already held for writing. The @@ -870,8 +858,19 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, } for _, header := range request.Headers[:i] { - if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil { - reconstruct(accepted, res) + if res, stale, throttle, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil { + if throttle { + log.Info( + "The downloader queue is throttled", + "threshold", q.resultCache.throttleThreshold, + "itemSize", q.resultSize, + "drop block from", header.Number.Uint64(), + ) + break + } else { + reconstruct(accepted, res) + q.calculateResultSize(res) + } } else { // else: betweeen here and above, some other peer filled this result, // or it was indeed a no-op. This should not happen, but if it does it's @@ -883,6 +882,19 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, delete(taskPool, hashes[accepted]) accepted++ } + // Using the newly calibrated resultsize, figure out the new throttle limit + // on the result cache + throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) + throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold) + + // Log some info at certain times + if time.Since(q.lastStatLog) > 60*time.Second { + q.lastStatLog = time.Now() + info := q.stats() + info = append(info, "throttle", throttleThreshold) + log.Info("Downloader queue stats", info...) + } + // Return all failed or missing fetches to the queue for _, header := range request.Headers[accepted:] { taskQueue.Push(header, -int64(header.Number.Uint64())) diff --git a/les/downloader/resultstore.go b/les/downloader/resultstore.go index 21928c2a00..ffea3eb10c 100644 --- a/les/downloader/resultstore.go +++ b/les/downloader/resultstore.go @@ -92,15 +92,13 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro } // GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag -// is true, that means the header has already been delivered 'upstream'. This method -// does not bubble up the 'throttle' flag, since it's moot at the point in time when -// the item is downloaded and ready for delivery -func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) { +// is true, that means the header has already been delivered 'upstream'. +func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, bool, error) { r.lock.RLock() defer r.lock.RUnlock() - res, _, stale, _, err := r.getFetchResult(headerNumber) - return res, stale, err + res, _, stale, throttle, err := r.getFetchResult(headerNumber) + return res, stale, throttle, err } // getFetchResult returns the fetchResult corresponding to the given item, and