Skip to content

Commit

Permalink
(choria-io#67) add some init options to ajc info
Browse files Browse the repository at this point in the history
This is not the full storage management feature I have in mind
but enough to get the most annoying niggles sorted now

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed May 5, 2022
1 parent a961cb2 commit 1d1db44
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 9 deletions.
19 changes: 16 additions & 3 deletions ajc/info_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,29 @@ package main

import (
"fmt"
"time"

"github.com/choria-io/asyncjobs"
"gopkg.in/alecthomas/kingpin.v2"
)

type infoCommand struct {
replicas uint
memory bool
retention time.Duration
}

func configureInfoCommand(app *kingpin.Application) {
app.Command("info", "Shows general Task and Queue information").Action(infoAction)
c := &infoCommand{}

info := app.Command("info", "Shows general Task and Queue information").Action(c.infoAction)
info.Flag("replica", "When initializing, do so with this many replicas").Short('R').Default("1").UintVar(&c.replicas)
info.Flag("memory", "When initializing, do so with memory storage").BoolVar(&c.memory)
info.Flag("retention", "When initializing, sets how long Tasks are kept").DurationVar(&c.retention)
}

func infoAction(_ *kingpin.ParseContext) error {
err := prepare()
func (c *infoCommand) infoAction(_ *kingpin.ParseContext) error {
err := prepare(asyncjobs.MemoryStorage(), asyncjobs.StoreReplicas(c.replicas), asyncjobs.TaskRetention(c.retention))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ require (
github.com/AlecAivazis/survey/v2 v2.3.4
github.com/dustin/go-humanize v1.0.0
github.com/nats-io/jsm.go v0.0.31
github.com/nats-io/nats-server/v2 v2.8.2
github.com/nats-io/nats.go v1.14.1-0.20220503230705-96c14456be81
github.com/nats-io/nats-server/v2 v2.8.3-0.20220504192053-f20fe2c2d8b8
github.com/nats-io/nats.go v1.15.0
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
github.com/prometheus/client_golang v1.12.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ github.com/nats-io/jsm.go v0.0.31/go.mod h1:LVb1bL1houzsI3nySNW/Omhg+d2kKnMHrgpN
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703/go.mod h1:5vic7C58BFEVltiZhs7Kq81q2WcEPhJPsmNv1FOrdv0=
github.com/nats-io/nats-server/v2 v2.8.2 h1:5m1VytMEbZx0YINvKY+X2gXdLNwP43uLXnFRwz8j8KE=
github.com/nats-io/nats-server/v2 v2.8.2/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
github.com/nats-io/nats-server/v2 v2.8.3-0.20220504192053-f20fe2c2d8b8 h1:ZFqKNby5LIb2sfZIzhYbPwEFMlrD/LRx4QkGAnBu3mk=
github.com/nats-io/nats-server/v2 v2.8.3-0.20220504192053-f20fe2c2d8b8/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4=
github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.14.1-0.20220412004736-c75dfd54b52c/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.14.1-0.20220503230705-96c14456be81 h1:vlhkkk3NAd1BeSymqqqdbVuQLBcfg2CcfX7ciATu+kM=
github.com/nats-io/nats.go v1.14.1-0.20220503230705-96c14456be81/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.15.0 h1:3IXNBolWrwIUf2soxh6Rla8gPzYWEZQBUBK6RV21s+o=
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898 h1:FoO4iS4qOKmNWMvv4T48tpwH9C/bs97vN2X9O47My8Y=
github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
16 changes: 16 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ func (s *jetStreamStorage) ConfigurationInfo() (*nats.KeyValueBucketStatus, erro
}

func (s *jetStreamStorage) TasksInfo() (*TasksInfo, error) {
if s.tasks == nil || s.tasks.stream == nil {
return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady)
}

res := &TasksInfo{
Time: time.Now().UTC(),
}
Expand All @@ -524,6 +528,10 @@ func (s *jetStreamStorage) TasksInfo() (*TasksInfo, error) {
}

func (s *jetStreamStorage) DeleteTaskByID(id string) error {
if s.tasks == nil || s.tasks.stream == nil {
return fmt.Errorf("%w: task store not initialized", ErrStorageNotReady)
}

msg, err := s.tasks.stream.ReadLastMessageForSubject(fmt.Sprintf(TasksStreamSubjectPattern, id))
if err != nil {
if jsm.IsNatsError(err, 10037) {
Expand All @@ -536,6 +544,10 @@ func (s *jetStreamStorage) DeleteTaskByID(id string) error {
}

func (s *jetStreamStorage) LoadTaskByID(id string) (*Task, error) {
if s.tasks == nil || s.tasks.stream == nil {
return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady)
}

msg, err := s.tasks.stream.ReadLastMessageForSubject(fmt.Sprintf(TasksStreamSubjectPattern, id))
if err != nil {
if jsm.IsNatsError(err, 10037) {
Expand Down Expand Up @@ -904,6 +916,10 @@ func (s *jetStreamStorage) ElectionStorage() (nats.KeyValue, error) {
}

func (s *jetStreamStorage) Tasks(ctx context.Context, limit int32) (chan *Task, error) {
if s.tasks == nil || s.tasks.stream == nil {
return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady)
}

nfo, err := s.tasks.stream.State()
if err != nil {
return nil, err
Expand Down

0 comments on commit 1d1db44

Please sign in to comment.