Skip to content

Commit

Permalink
(choria-io#104) avoid duplicates in task list
Browse files Browse the repository at this point in the history
Also default to 10000 when no limit is given

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Dec 26, 2022
1 parent 8f37499 commit 68926d7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
6 changes: 4 additions & 2 deletions ajc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ func main() {

default:
fmt.Fprintf(os.Stderr, "ajc runtime error: %v\n", err)
fmt.Fprintln(os.Stderr)
ajc.Usage(os.Args[1:])
if err != asyncjobs.ErrNoTasks {
fmt.Fprintln(os.Stderr)
ajc.Usage(os.Args[1:])
}
}

os.Exit(1)
Expand Down
41 changes: 25 additions & 16 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,11 @@ func (s *jetStreamStorage) Tasks(ctx context.Context, limit int32) (chan *Task,
return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady)
}

if limit <= 0 {
s.log.Debugf("Defaulting to task list limit of 10000")
limit = 10000
}

nfo, err := s.tasks.stream.State()
if err != nil {
return nil, err
Expand All @@ -934,29 +939,33 @@ func (s *jetStreamStorage) Tasks(ctx context.Context, limit int32) (chan *Task,
timeout, cancel := context.WithTimeout(ctx, time.Minute)

sub, err := s.nc.Subscribe(s.nc.NewRespInbox(), func(msg *nats.Msg) {
if len(msg.Data) > 0 {
task := &Task{}
err := json.Unmarshal(msg.Data, task)
if len(msg.Data) == 0 {
return
}

task := &Task{}
err := json.Unmarshal(msg.Data, task)
if err != nil {
return
}

select {
case out <- task:
md, err := msg.Metadata()
if err != nil {
return
}

select {
case out <- task:
md, err := msg.Metadata()
if err != nil {
return
}
done := atomic.AddInt32(&cnt, 1)
if md.NumPending == 0 || done == limit {
msg.Sub.Unsubscribe()
cancel()
}
default:
done := atomic.AddInt32(&cnt, 1)
if md.NumPending == 0 || done == limit {
msg.Sub.Unsubscribe()
cancel()
}
default:
msg.Sub.Unsubscribe()
cancel()
}

msg.Ack()
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 68926d7

Please sign in to comment.