Skip to content

Commit

Permalink
Merge pull request #39 from alibaba/bugfix-block-queue
Browse files Browse the repository at this point in the history
bugfix: make ReqQueue put block when is full
  • Loading branch information
HuangXiaomeng authored Dec 3, 2024
2 parents 1533e57 + 44cff51 commit c175253
Showing 1 changed file with 3 additions and 13 deletions.
16 changes: 3 additions & 13 deletions internal/batch/req_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,14 @@

package batch

import (
"go.uber.org/atomic"
)

type ReqQueue struct {
maxSize int32
curSize *atomic.Int32
requests chan any
}

func NewReqQueue(maxSize int32) (q *ReqQueue) {
return &ReqQueue{
maxSize: maxSize,
curSize: atomic.NewInt32(0),
requests: make(chan any, maxSize),
}
}
Expand All @@ -39,7 +33,6 @@ func (q *ReqQueue) SubmitRequest(req any) {
return
}
q.requests <- req
q.curSize.Inc()
}

func (q *ReqQueue) RetrieveRequests(batchSize int32) []any {
Expand All @@ -56,20 +49,17 @@ func (q *ReqQueue) RetrieveRequests(batchSize int32) []any {

func (q *ReqQueue) pop() any {
select {
case req, ok := <-q.requests:
if ok {
q.curSize.Dec()
}
case req := <-q.requests:
return req
default:
return nil
}
}

func (q *ReqQueue) Size() int {
return int(q.curSize.Load())
return len(q.requests)
}

func (q *ReqQueue) Clear() {
close(q.requests)
q.requests = nil
}

0 comments on commit c175253

Please sign in to comment.