From 4b2f79473d864cf407c97e4e9693c67974b2b492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BC=9F=E5=BF=A0?= Date: Mon, 2 Dec 2024 17:17:42 +0800 Subject: [PATCH 1/2] bugfix: make ReqQueue put block when is full --- internal/batch/req_queue.go | 80 ++++++++++++++----------------------- 1 file changed, 29 insertions(+), 51 deletions(-) diff --git a/internal/batch/req_queue.go b/internal/batch/req_queue.go index 2dd3dd3..11e5e16 100644 --- a/internal/batch/req_queue.go +++ b/internal/batch/req_queue.go @@ -17,81 +17,59 @@ package batch import ( - "sync" - - "github.com/alibaba/schedulerx-worker-go/logger" + "go.uber.org/atomic" ) type ReqQueue struct { - capacity int32 - requests []interface{} - lock sync.RWMutex + maxSize int32 + curSize *atomic.Int32 + requests chan any } -func NewReqQueue(capacity int32) (q *ReqQueue) { +func NewReqQueue(maxSize int32) (q *ReqQueue) { return &ReqQueue{ - capacity: capacity, - requests: make([]interface{}, 0, capacity), + maxSize: maxSize, + curSize: atomic.NewInt32(0), + requests: make(chan any, maxSize), } } -func (q *ReqQueue) SubmitRequest(request interface{}) { - q.push(request) +func (q *ReqQueue) SubmitRequest(req any) { + if req == nil { + return + } + q.requests <- req + q.curSize.Inc() } -func (q *ReqQueue) RetrieveRequests(batchSize int32) []interface{} { - res := make([]interface{}, 0, batchSize) +func (q *ReqQueue) RetrieveRequests(batchSize int32) []any { + requests := make([]any, 0, batchSize) for i := int32(0); i < batchSize; i++ { - request := q.pop() - if request == nil { - // empty, just break + req := q.pop() + if req == nil { break } - res = append(res, request) + requests = append(requests, req) } - return res + return requests } -func (q *ReqQueue) push(req interface{}) { - if req != nil { - if q.capacity > 0 && int32(q.Size()) == q.capacity { - logger.Warnf("req queue is full, capacity: %d", q.capacity) - return +func (q *ReqQueue) pop() any { + select { + case req, ok := <-q.requests: + if ok { + q.curSize.Dec() } - q.lock.Lock() - q.requests = append(q.requests, req) - q.lock.Unlock() - } -} - -func (q *ReqQueue) pop() interface{} { - if q.Size() == 0 { + return req + default: return nil } - q.lock.Lock() - defer q.lock.Unlock() - req := q.requests[0] - q.requests = q.requests[1:] - return req -} - -func (q *ReqQueue) SetCapacity(capacity int32) { - q.lock.Lock() - defer q.lock.Unlock() - q.capacity = capacity - if int32(q.Size()) > q.capacity { - q.requests = q.requests[:q.capacity] - } } func (q *ReqQueue) Size() int { - q.lock.RLock() - defer q.lock.RUnlock() - return len(q.requests) + return int(q.curSize.Load()) } func (q *ReqQueue) Clear() { - q.lock.Lock() - defer q.lock.Unlock() - q.requests = nil + close(q.requests) } From 44cff513b16100ff6de4a1a10b4ee5a631cb0c1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BC=9F=E5=BF=A0?= Date: Tue, 3 Dec 2024 11:35:12 +0800 Subject: [PATCH 2/2] bugfix: make ReqQueue put block when is full --- internal/batch/req_queue.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/internal/batch/req_queue.go b/internal/batch/req_queue.go index 11e5e16..e72f184 100644 --- a/internal/batch/req_queue.go +++ b/internal/batch/req_queue.go @@ -16,20 +16,14 @@ package batch -import ( - "go.uber.org/atomic" -) - type ReqQueue struct { maxSize int32 - curSize *atomic.Int32 requests chan any } func NewReqQueue(maxSize int32) (q *ReqQueue) { return &ReqQueue{ maxSize: maxSize, - curSize: atomic.NewInt32(0), requests: make(chan any, maxSize), } } @@ -39,7 +33,6 @@ func (q *ReqQueue) SubmitRequest(req any) { return } q.requests <- req - q.curSize.Inc() } func (q *ReqQueue) RetrieveRequests(batchSize int32) []any { @@ -56,10 +49,7 @@ func (q *ReqQueue) RetrieveRequests(batchSize int32) []any { func (q *ReqQueue) pop() any { select { - case req, ok := <-q.requests: - if ok { - q.curSize.Dec() - } + case req := <-q.requests: return req default: return nil @@ -67,9 +57,9 @@ func (q *ReqQueue) pop() any { } func (q *ReqQueue) Size() int { - return int(q.curSize.Load()) + return len(q.requests) } func (q *ReqQueue) Clear() { - close(q.requests) + q.requests = nil }