diff --git a/ajc/info_command.go b/ajc/info_command.go index b853dfa..b145e77 100644 --- a/ajc/info_command.go +++ b/ajc/info_command.go @@ -6,16 +6,29 @@ package main import ( "fmt" + "time" + "github.com/choria-io/asyncjobs" "gopkg.in/alecthomas/kingpin.v2" ) +type infoCommand struct { + replicas uint + memory bool + retention time.Duration +} + func configureInfoCommand(app *kingpin.Application) { - app.Command("info", "Shows general Task and Queue information").Action(infoAction) + c := &infoCommand{} + + info := app.Command("info", "Shows general Task and Queue information").Action(c.infoAction) + info.Flag("replica", "When initializing, do so with this many replicas").Short('R').Default("1").UintVar(&c.replicas) + info.Flag("memory", "When initializing, do so with memory storage").BoolVar(&c.memory) + info.Flag("retention", "When initializing, sets how long Tasks are kept").DurationVar(&c.retention) } -func infoAction(_ *kingpin.ParseContext) error { - err := prepare() +func (c *infoCommand) infoAction(_ *kingpin.ParseContext) error { + err := prepare(asyncjobs.MemoryStorage(), asyncjobs.StoreReplicas(c.replicas), asyncjobs.TaskRetention(c.retention)) if err != nil { return err } diff --git a/go.mod b/go.mod index f2eddc0..6733987 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,8 @@ require ( github.com/AlecAivazis/survey/v2 v2.3.4 github.com/dustin/go-humanize v1.0.0 github.com/nats-io/jsm.go v0.0.31 - github.com/nats-io/nats-server/v2 v2.8.2 - github.com/nats-io/nats.go v1.14.1-0.20220503230705-96c14456be81 + github.com/nats-io/nats-server/v2 v2.8.3-0.20220504192053-f20fe2c2d8b8 + github.com/nats-io/nats.go v1.15.0 github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/gomega v1.19.0 github.com/prometheus/client_golang v1.12.1 diff --git a/go.sum b/go.sum index 35500d6..ebe50b1 100644 --- a/go.sum +++ b/go.sum @@ -193,12 +193,12 @@ github.com/nats-io/jsm.go v0.0.31/go.mod h1:LVb1bL1houzsI3nySNW/Omhg+d2kKnMHrgpN github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I= github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703/go.mod h1:5vic7C58BFEVltiZhs7Kq81q2WcEPhJPsmNv1FOrdv0= -github.com/nats-io/nats-server/v2 v2.8.2 h1:5m1VytMEbZx0YINvKY+X2gXdLNwP43uLXnFRwz8j8KE= -github.com/nats-io/nats-server/v2 v2.8.2/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4= +github.com/nats-io/nats-server/v2 v2.8.3-0.20220504192053-f20fe2c2d8b8 h1:ZFqKNby5LIb2sfZIzhYbPwEFMlrD/LRx4QkGAnBu3mk= +github.com/nats-io/nats-server/v2 v2.8.3-0.20220504192053-f20fe2c2d8b8/go.mod h1:vIdpKz3OG+DCg4q/xVPdXHoztEyKDWRtykQ4N7hd7C4= github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.14.1-0.20220412004736-c75dfd54b52c/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nats.go v1.14.1-0.20220503230705-96c14456be81 h1:vlhkkk3NAd1BeSymqqqdbVuQLBcfg2CcfX7ciATu+kM= -github.com/nats-io/nats.go v1.14.1-0.20220503230705-96c14456be81/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.15.0 h1:3IXNBolWrwIUf2soxh6Rla8gPzYWEZQBUBK6RV21s+o= +github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898 h1:FoO4iS4qOKmNWMvv4T48tpwH9C/bs97vN2X9O47My8Y= github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/storage.go b/storage.go index 0fcad46..4b51391 100644 --- a/storage.go +++ b/storage.go @@ -510,6 +510,10 @@ func (s *jetStreamStorage) ConfigurationInfo() (*nats.KeyValueBucketStatus, erro } func (s *jetStreamStorage) TasksInfo() (*TasksInfo, error) { + if s.tasks == nil || s.tasks.stream == nil { + return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady) + } + res := &TasksInfo{ Time: time.Now().UTC(), } @@ -524,6 +528,10 @@ func (s *jetStreamStorage) TasksInfo() (*TasksInfo, error) { } func (s *jetStreamStorage) DeleteTaskByID(id string) error { + if s.tasks == nil || s.tasks.stream == nil { + return fmt.Errorf("%w: task store not initialized", ErrStorageNotReady) + } + msg, err := s.tasks.stream.ReadLastMessageForSubject(fmt.Sprintf(TasksStreamSubjectPattern, id)) if err != nil { if jsm.IsNatsError(err, 10037) { @@ -536,6 +544,10 @@ func (s *jetStreamStorage) DeleteTaskByID(id string) error { } func (s *jetStreamStorage) LoadTaskByID(id string) (*Task, error) { + if s.tasks == nil || s.tasks.stream == nil { + return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady) + } + msg, err := s.tasks.stream.ReadLastMessageForSubject(fmt.Sprintf(TasksStreamSubjectPattern, id)) if err != nil { if jsm.IsNatsError(err, 10037) { @@ -904,6 +916,10 @@ func (s *jetStreamStorage) ElectionStorage() (nats.KeyValue, error) { } func (s *jetStreamStorage) Tasks(ctx context.Context, limit int32) (chan *Task, error) { + if s.tasks == nil || s.tasks.stream == nil { + return nil, fmt.Errorf("%w: task store not initialized", ErrStorageNotReady) + } + nfo, err := s.tasks.stream.State() if err != nil { return nil, err