diff --git a/internal/batch/req_queue.go b/internal/batch/req_queue.go index 2dd3dd3..e72f184 100644 --- a/internal/batch/req_queue.go +++ b/internal/batch/req_queue.go @@ -16,82 +16,50 @@ package batch -import ( - "sync" - - "github.com/alibaba/schedulerx-worker-go/logger" -) - type ReqQueue struct { - capacity int32 - requests []interface{} - lock sync.RWMutex + maxSize int32 + requests chan any } -func NewReqQueue(capacity int32) (q *ReqQueue) { +func NewReqQueue(maxSize int32) (q *ReqQueue) { return &ReqQueue{ - capacity: capacity, - requests: make([]interface{}, 0, capacity), + maxSize: maxSize, + requests: make(chan any, maxSize), } } -func (q *ReqQueue) SubmitRequest(request interface{}) { - q.push(request) +func (q *ReqQueue) SubmitRequest(req any) { + if req == nil { + return + } + q.requests <- req } -func (q *ReqQueue) RetrieveRequests(batchSize int32) []interface{} { - res := make([]interface{}, 0, batchSize) +func (q *ReqQueue) RetrieveRequests(batchSize int32) []any { + requests := make([]any, 0, batchSize) for i := int32(0); i < batchSize; i++ { - request := q.pop() - if request == nil { - // empty, just break + req := q.pop() + if req == nil { break } - res = append(res, request) - } - return res -} - -func (q *ReqQueue) push(req interface{}) { - if req != nil { - if q.capacity > 0 && int32(q.Size()) == q.capacity { - logger.Warnf("req queue is full, capacity: %d", q.capacity) - return - } - q.lock.Lock() - q.requests = append(q.requests, req) - q.lock.Unlock() + requests = append(requests, req) } + return requests } -func (q *ReqQueue) pop() interface{} { - if q.Size() == 0 { +func (q *ReqQueue) pop() any { + select { + case req := <-q.requests: + return req + default: return nil } - q.lock.Lock() - defer q.lock.Unlock() - req := q.requests[0] - q.requests = q.requests[1:] - return req -} - -func (q *ReqQueue) SetCapacity(capacity int32) { - q.lock.Lock() - defer q.lock.Unlock() - q.capacity = capacity - if int32(q.Size()) > q.capacity { - q.requests = q.requests[:q.capacity] - } } func (q *ReqQueue) Size() int { - q.lock.RLock() - defer q.lock.RUnlock() return len(q.requests) } func (q *ReqQueue) Clear() { - q.lock.Lock() - defer q.lock.Unlock() q.requests = nil }