Skip to content

Commit

Permalink
Merge pull request #532 from ripienaar/wq_view
Browse files Browse the repository at this point in the history
Fix viewing work queues, except with time deltas
  • Loading branch information
ripienaar authored Apr 10, 2024
2 parents 4338e4a + ca7cf50 commit a84e2a3
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 31 deletions.
56 changes: 54 additions & 2 deletions msginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,62 @@ func ParseJSMsgMetadataReply(reply string) (info *MsgInfo, err error) {
domain = parts[2]
}

return &MsgInfo{stream, consumer, streamSeq, consumerSeq, delivered, pending, ts, domain}, nil
nfo := &MsgInfo{
stream: stream,
consumer: consumer,
sSeq: streamSeq,
cSeq: consumerSeq,
delivered: delivered,
pending: pending,
ts: ts,
domain: domain,
}

return nfo, nil
}

// ParseJSMsgMetadataDirect parses the DIRECT GET headers into a MsgInfo, in this case all consumer
// related properties will not be filled in as there is no consumer involved
func ParseJSMsgMetadataDirect(headers nats.Header) (*MsgInfo, error) {
nfo := &MsgInfo{
stream: headers.Get("Nats-Stream"),
}

sSeq, err := strconv.Atoi(headers.Get("Nats-Sequence"))
if err != nil {
return nil, err
}
nfo.sSeq = uint64(sSeq)

pending := headers.Get("Nats-Num-Pending")
if pending != "" {
pc, err := strconv.Atoi(pending)
if err != nil {
return nil, err
}
nfo.pending = uint64(pc)
}

ts, err := time.Parse(time.RFC3339, headers.Get("Nats-Time-Stamp"))
if err != nil {
return nil, err
}
nfo.ts = ts

return &MsgInfo{}, nil
}

// ParseJSMsgMetadata parse the reply subject metadata to determine message metadata
//
// When given a message obtained using Direct Get APIs several fields will be filled in but
// consumer related ones will not as there is no consumer involved in that case
func ParseJSMsgMetadata(m *nats.Msg) (info *MsgInfo, err error) {
return ParseJSMsgMetadataReply(m.Reply)
switch {
case len(m.Reply) > 0:
return ParseJSMsgMetadataReply(m.Reply)
case len(m.Header.Get("Nats-Sequence")) > 0:
return ParseJSMsgMetadataDirect(m.Header)
default:
return nil, fmt.Errorf("unknown metadata format")
}
}
84 changes: 56 additions & 28 deletions stream_pager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
Expand Down Expand Up @@ -114,10 +115,26 @@ func (p *StreamPager) start(stream *Stream, mgr *Manager, opts ...PagerOption) e
return err
}

err = p.createConsumer()
if err != nil {
p.close()
return err
if p.useDirect {
if p.startSeq > 0 {
p.curSeq = uint64(p.startSeq)
} else {
nfo, err := p.stream.State()
if err != nil {
return err
}
p.curSeq = nfo.FirstSeq
}

if p.startDelta > 0 {
return fmt.Errorf("workqueue paging does not support time delta starting positions")
}
} else {
err = p.createConsumer()
if err != nil {
p.close()
return err
}
}

p.started = true
Expand All @@ -126,22 +143,30 @@ func (p *StreamPager) start(stream *Stream, mgr *Manager, opts ...PagerOption) e
}

func (p *StreamPager) directGetBatch() error {
for i := 1; i <= p.pageSize; i++ {
req := api.JSApiMsgGetRequest{Seq: p.curSeq, NextFor: p.filterSubject}
// the idea here is to fetch a batch matching page to fill up the queue but direct makes this difficult
// because different stream replicas can answer and also the message that causes 404 replies much faster than
// one that includes data so it ends up out of order and just weird
//
// so until the batch api launch we have to do a get on demand to preserve order and to handle the 404 correctly

rj, err := json.Marshal(req)
if err != nil {
return err
}
req := api.JSApiMsgGetRequest{Seq: p.curSeq, NextFor: p.filterSubject}

err = p.mgr.nc.PublishRequest(p.consumer.NextSubject(), p.sub.Subject, rj)
if err != nil {
return err
}
rj, err := json.Marshal(req)
if err != nil {
return err
}

p.curSeq++
if p.mgr.trace {
log.Printf(">>> %s\n%s\n\n", p.stream.DirectSubject(), string(rj))
}

err = p.mgr.nc.PublishRequest(p.stream.DirectSubject(), p.sub.Subject, rj)
if err != nil {
return err
}

p.curSeq++

return nil
}

Expand Down Expand Up @@ -173,16 +198,21 @@ func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, er
p.mu.Lock()
defer p.mu.Unlock()

if p.useDirect {
err = p.directGetBatch()
if err != nil {
return nil, false, err
}
}

if p.seen == p.pageSize || p.seen == -1 {
p.seen = 0

if p.useDirect {
err = p.directGetBatch()
} else {
if !p.useDirect {
err = p.fetchBatch()
}
if err != nil {
return nil, false, err
if err != nil {
return nil, false, err
}
}
}

Expand All @@ -191,6 +221,10 @@ func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, er

select {
case msg := <-p.q:
if p.mgr.trace {
log.Printf("<<< (%d) %s\n%v\n", len(msg.Data), msg.Header, string(msg.Data))
}

p.seen++

status := msg.Header.Get("Status")
Expand All @@ -199,13 +233,7 @@ func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, er
}

if p.useDirect {
nfo, err := ParseJSMsgMetadata(msg)
if err != nil {
p.curSeq = nfo.StreamSequence()
}
if nfo.Pending() == 0 {
return msg, true, fmt.Errorf("last message reached")
}
msg.Subject = msg.Header.Get("Nats-Subject")
} else {
msg.Ack()
}
Expand Down
56 changes: 55 additions & 1 deletion stream_pager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func TestPager(t *testing.T) {
if err != nil {
t.Fatalf("pager creation failed: %s", err)
}
defer pgr.Close()

seen := 0
pages := 0
Expand Down Expand Up @@ -62,5 +61,60 @@ func TestPager(t *testing.T) {
if len(known) != 0 {
t.Fatalf("expected no consumers got %v", known)
}
}

func TestPagerWQ(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
defer nc.Flush()

str, err := mgr.NewStream("PAGERTEST", jsm.Subjects("js.in.pager"), jsm.WorkQueueRetention(), jsm.AllowDirect())
if err != nil {
t.Fatalf("stream create failed: %s", err)
}

_, err = str.NewConsumer(jsm.ConsumerName("PULL"))
if err != nil {
t.Fatalf("consumer create failed: %s", err)
}

for i := 1; i <= 200; i++ {
_, err = nc.Request("js.in.pager", []byte(fmt.Sprintf("message %d", i)), time.Second)
if err != nil {
t.Fatalf("publish failed: %s", err)
}
}

_, err = str.PageContents(jsm.PagerSize(25), jsm.PagerStartDelta(time.Hour))
if err == nil || err.Error() != "workqueue paging does not support time delta starting positions" {
t.Fatalf("pager creation did not fail for time delta: %v", err)
}

pgr, err := str.PageContents(jsm.PagerSize(25))
if err != nil {
t.Fatalf("pager creation failed: %s", err)
}

seen := 0
pages := 0
for {
_, last, err := pgr.NextMsg(context.Background())
if err != nil && last && seen == 200 && pages == 8 {
break
}

if err != nil {
t.Fatalf("next failed seen %d pages %d: %s", seen, pages, err)
}

seen++
if last {
pages++
}
}

err = pgr.Close()
if err != nil {
t.Fatalf("close failed")
}
}

0 comments on commit a84e2a3

Please sign in to comment.