Skip to content

Commit

Permalink
fix: store message queue
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <[email protected]>
  • Loading branch information
gfanton committed Jun 8, 2023
1 parent 6f6f117 commit 1c38eea
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 84 deletions.
148 changes: 148 additions & 0 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package queue

import (
"container/heap"
"container/list"
"context"
"sync"
)

type ICounter interface {
Counter() uint64
}

// A priorityMessageQueue implements heap.Interface and holds Items.
type PriorityQueue[T ICounter] struct {
messages []T
muMessages sync.RWMutex
}

func NewPriorityQueue[T ICounter]() *PriorityQueue[T] {
queue := &PriorityQueue[T]{
messages: []T{},
}

heap.Init(queue)
return queue
}

func (pq *PriorityQueue[T]) Add(m T) {
pq.muMessages.Lock()
heap.Push(pq, m)
pq.muMessages.Unlock()
}

func (pq *PriorityQueue[T]) Next() (item T) {
pq.muMessages.Lock()
if len(pq.messages) > 0 {
item = heap.Pop(pq).(T)
}
pq.muMessages.Unlock()
return
}

func (pq *PriorityQueue[T]) Size() (l int) {
pq.muMessages.RLock()
l = pq.Len()
pq.muMessages.RUnlock()
return
}

func (pq *PriorityQueue[T]) Len() (l int) {
l = len(pq.messages)
return
}

func (pq *PriorityQueue[T]) Less(i, j int) bool {
// We want Pop to give us the lowest, not highest, priority so we use lower than here.
return pq.messages[i].Counter() < pq.messages[j].Counter()
}

func (pq *PriorityQueue[T]) Swap(i, j int) {
pq.messages[i], pq.messages[j] = pq.messages[j], pq.messages[i]
}

func (pq *PriorityQueue[T]) Push(x interface{}) {
pq.messages = append(pq.messages, x.(T))
}

func (pq *PriorityQueue[T]) Pop() (item interface{}) {
var null T
if n := len(pq.messages); n > 0 {
item = pq.messages[n-1]
pq.messages, pq.messages[n-1] = pq.messages[:n-1], null
}
return item
}

func NewSimpleQueue[T any]() *SimpleQueue[T] {
return &SimpleQueue[T]{
items: list.New(),
notify: newItemNotify(),
}
}

// A priorityMessageQueue implements heap.Interface and holds Items.
type SimpleQueue[T any] struct {
items *list.List
muMessages sync.RWMutex

notify *itemNotify
}

func (q *SimpleQueue[T]) Add(m T) {
q.muMessages.Lock()
_ = q.items.PushBack(m)
q.notify.Broadcast()
q.muMessages.Unlock()
}

func (q *SimpleQueue[T]) Pop() (m T, ok bool) {
q.muMessages.Lock()
if front := q.items.Front(); front != nil {
m = q.items.Remove(front).(T)
ok = true
}
q.muMessages.Unlock()

return
}

func (q *SimpleQueue[T]) WaitForNewItem(ctx context.Context) (item T, ok bool) {
for {
if item, ok = q.Pop(); ok {
return
}

if ok = q.notify.Wait(ctx); !ok {
return
}
}
}

type itemNotify struct {
signal chan struct{}
muSignal sync.Mutex
}

func newItemNotify() *itemNotify {
return &itemNotify{
signal: make(chan struct{}, 1),
}
}

func (m *itemNotify) Wait(ctx context.Context) (ok bool) {
select {
case <-m.signal:
return true
case <-ctx.Done():
return false
}
}

func (m *itemNotify) Broadcast() {
select {
case m.signal <- struct{}{}:
default:
}
}
138 changes: 138 additions & 0 deletions internal/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package queue

import (
"container/list"
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type testSimpleQueue = *SimpleQueue[int]

func newTestSimpleQueue() testSimpleQueue {
return &SimpleQueue[int]{
items: list.New(),
notify: newItemNotify(),
}
}

func TestQueue(t *testing.T) {
queue := newTestSimpleQueue()

e, ok := queue.Pop()
require.Equal(t, 0, e)
require.False(t, ok)

queue.Add(1)
e, ok = queue.Pop()
require.Equal(t, 1, e)
require.True(t, ok)

e, ok = queue.Pop()
require.Equal(t, 0, e)
require.False(t, ok)
}

func TestSyncQueue(t *testing.T) {
cases := []struct{ N int }{
{1}, {10}, {100}, {1000}, {10000},
}

for _, tc := range cases {
name := fmt.Sprintf("%d_elements", tc.N)
t.Run(name, func(t *testing.T) {
queue := newTestSimpleQueue()

for i := 0; i < tc.N; i++ {
queue.Add(i + 1)
}

for i := 0; i < tc.N; i++ {
e, ok := queue.Pop()
require.Equal(t, i+1, e)
require.True(t, ok)
}
})
}
}

func TestAsyncQueue(t *testing.T) {
cases := []struct{ N int }{
{1}, {10}, {100}, {1000}, {10000},
}

for _, tc := range cases {
name := fmt.Sprintf("%d_elements", tc.N)
t.Run(name, func(t *testing.T) {
queue := newTestSimpleQueue()

wg := sync.WaitGroup{}

wg.Add(tc.N)
elems := map[int]struct{}{}
for i := 0; i < tc.N; i++ {
elems[i+1] = struct{}{}
go func(i int) {
queue.Add(i + 1)
wg.Done()
}(i)
}

wg.Wait()

for i := 0; i < tc.N; i++ {
e, ok := queue.Pop()
require.True(t, ok)

_, exist := elems[e]
require.True(t, exist)
delete(elems, e)
}

require.Len(t, elems, 0)
})
}
}

func TestWaitnForItemQueue(t *testing.T) {
cases := []struct{ N int }{
{1}, {10}, {100}, {1000}, {10000},
}

for _, tc := range cases {
name := fmt.Sprintf("%d_elements", tc.N)
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

queue := newTestSimpleQueue()

cc := make(chan int, tc.N)
go func() {
for {
e, ok := queue.WaitForNewItem(ctx)
if !ok {
return
}

// simulate latency
time.Sleep(time.Microsecond * 10)
cc <- e
}
}()

for i := 0; i < tc.N; i++ {
queue.Add(i + 1)
}

for i := 0; i < tc.N; i++ {
e := <-cc
require.Equal(t, i+1, e)
}
})
}
}
Loading

0 comments on commit 1c38eea

Please sign in to comment.