From 44cff513b16100ff6de4a1a10b4ee5a631cb0c1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BC=9F=E5=BF=A0?= Date: Tue, 3 Dec 2024 11:35:12 +0800 Subject: [PATCH] bugfix: make ReqQueue put block when is full --- internal/batch/req_queue.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/internal/batch/req_queue.go b/internal/batch/req_queue.go index 11e5e16..e72f184 100644 --- a/internal/batch/req_queue.go +++ b/internal/batch/req_queue.go @@ -16,20 +16,14 @@ package batch -import ( - "go.uber.org/atomic" -) - type ReqQueue struct { maxSize int32 - curSize *atomic.Int32 requests chan any } func NewReqQueue(maxSize int32) (q *ReqQueue) { return &ReqQueue{ maxSize: maxSize, - curSize: atomic.NewInt32(0), requests: make(chan any, maxSize), } } @@ -39,7 +33,6 @@ func (q *ReqQueue) SubmitRequest(req any) { return } q.requests <- req - q.curSize.Inc() } func (q *ReqQueue) RetrieveRequests(batchSize int32) []any { @@ -56,10 +49,7 @@ func (q *ReqQueue) RetrieveRequests(batchSize int32) []any { func (q *ReqQueue) pop() any { select { - case req, ok := <-q.requests: - if ok { - q.curSize.Dec() - } + case req := <-q.requests: return req default: return nil @@ -67,9 +57,9 @@ func (q *ReqQueue) pop() any { } func (q *ReqQueue) Size() int { - return int(q.curSize.Load()) + return len(q.requests) } func (q *ReqQueue) Clear() { - close(q.requests) + q.requests = nil }