Skip to content

Commit

Permalink
Merge pull request #40 from alibaba/develop
Browse files Browse the repository at this point in the history
v1.0.8
  • Loading branch information
HuangXiaomeng authored Dec 12, 2024
2 parents 6dac403 + c175253 commit 7b517c8
Showing 1 changed file with 21 additions and 53 deletions.
74 changes: 21 additions & 53 deletions internal/batch/req_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,50 @@

package batch

import (
"sync"

"github.com/alibaba/schedulerx-worker-go/logger"
)

type ReqQueue struct {
capacity int32
requests []interface{}
lock sync.RWMutex
maxSize int32
requests chan any
}

func NewReqQueue(capacity int32) (q *ReqQueue) {
func NewReqQueue(maxSize int32) (q *ReqQueue) {
return &ReqQueue{
capacity: capacity,
requests: make([]interface{}, 0, capacity),
maxSize: maxSize,
requests: make(chan any, maxSize),
}
}

func (q *ReqQueue) SubmitRequest(request interface{}) {
q.push(request)
func (q *ReqQueue) SubmitRequest(req any) {
if req == nil {
return
}
q.requests <- req
}

func (q *ReqQueue) RetrieveRequests(batchSize int32) []interface{} {
res := make([]interface{}, 0, batchSize)
func (q *ReqQueue) RetrieveRequests(batchSize int32) []any {
requests := make([]any, 0, batchSize)
for i := int32(0); i < batchSize; i++ {
request := q.pop()
if request == nil {
// empty, just break
req := q.pop()
if req == nil {
break
}
res = append(res, request)
}
return res
}

func (q *ReqQueue) push(req interface{}) {
if req != nil {
if q.capacity > 0 && int32(q.Size()) == q.capacity {
logger.Warnf("req queue is full, capacity: %d", q.capacity)
return
}
q.lock.Lock()
q.requests = append(q.requests, req)
q.lock.Unlock()
requests = append(requests, req)
}
return requests
}

func (q *ReqQueue) pop() interface{} {
if q.Size() == 0 {
func (q *ReqQueue) pop() any {
select {
case req := <-q.requests:
return req
default:
return nil
}
q.lock.Lock()
defer q.lock.Unlock()
req := q.requests[0]
q.requests = q.requests[1:]
return req
}

func (q *ReqQueue) SetCapacity(capacity int32) {
q.lock.Lock()
defer q.lock.Unlock()
q.capacity = capacity
if int32(q.Size()) > q.capacity {
q.requests = q.requests[:q.capacity]
}
}

func (q *ReqQueue) Size() int {
q.lock.RLock()
defer q.lock.RUnlock()
return len(q.requests)
}

func (q *ReqQueue) Clear() {
q.lock.Lock()
defer q.lock.Unlock()
q.requests = nil
}

0 comments on commit 7b517c8

Please sign in to comment.