diff --git a/ajc/queue_command.go b/ajc/queue_command.go index 3111cac..6d16fad 100644 --- a/ajc/queue_command.go +++ b/ajc/queue_command.go @@ -26,6 +26,8 @@ type queueCommand struct { memory bool replicas int discardOld bool + maxBytes int64 + maxBytesSet bool } func configureQueueCommand(app *fisk.Application) { @@ -43,6 +45,7 @@ func configureQueueCommand(app *fisk.Application) { add.Flag("memory", "Store the Queue in memory").BoolVar(&c.memory) add.Flag("replicas", "Number of storage replicas to configure").Default("1").IntVar(&c.replicas) add.Flag("discard-old", "When full, discard old entries").BoolVar(&c.discardOld) + add.Flag("max-bytes", "Maximum bytes that can be stored in the queue, -1 for unlimited").Default("-1").IsSetByUser(&c.maxBytesSet).Int64Var(&c.maxBytes) queues.Command("list", "List Queues").Alias("ls").Action(c.lsAction) @@ -65,6 +68,7 @@ func configureQueueCommand(app *fisk.Application) { cfg.Flag("run-time", "Maximum run-time to allow per task").Default("-1s").DurationVar(&c.maxTime) cfg.Flag("concurrent", "Maximum concurrent jobs that can be ran").Default("-2").IntVar(&c.maxConcurrent) cfg.Flag("replicas", "Number of storage replicas to configure").Default("-1").IntVar(&c.replicas) + cfg.Flag("max-bytes", "Maximum bytes that can be stored in the queue, -1 for unlimited").Default("-1").IsSetByUser(&c.maxBytesSet).Int64Var(&c.maxBytes) } func (c *queueCommand) addAction(_ *fisk.ParseContext) error { @@ -88,6 +92,10 @@ func (c *queueCommand) addAction(_ *fisk.ParseContext) error { MaxConcurrent: c.maxConcurrent, } + if c.maxBytesSet { + queue.MaxBytes = c.maxBytes + } + err = admin.PrepareQueue(queue, c.replicas, c.memory) if err != nil { return err @@ -146,6 +154,10 @@ func (c *queueCommand) configureAction(_ *fisk.ParseContext) error { ccfg.MaxAckPending = c.maxConcurrent } + if c.maxBytesSet { + scfg.MaxBytes = c.maxBytes + } + mgr, _, err := admin.TasksStore() if err != nil { return err diff --git a/ajc/task_command.go b/ajc/task_command.go index 07733e4..9edea52 100644 --- a/ajc/task_command.go +++ b/ajc/task_command.go @@ -41,6 +41,8 @@ type taskCommand struct { ed25519Seed string ed25519PubKey string optionalSigs bool + maxBytes int64 + maxBytesSet bool limit int json bool @@ -85,6 +87,7 @@ func configureTaskCommand(app *fisk.Application) { init.Flag("memory", "Use memory as a storage backend").BoolVar(&c.memory) init.Flag("retention", "Sets how long Tasks are kept in the Task Store").DurationVar(&c.retention) init.Flag("replicas", "How many replicas to keep in a JetStream cluster").Default("1").IntVar(&c.replicas) + init.Flag("max-bytes", "Maximum bytes that can be stored in the queue, -1 for unlimited").Default("-1").IsSetByUser(&c.maxBytesSet).Int64Var(&c.maxBytes) config := tasks.Command("configure", "Configures the Task storage").Alias("config").Alias("cfg").Action(c.configAction) config.Arg("retention", "Sets how long Tasks are kept in the Task Store").Required().DurationVar(&c.retention) @@ -172,7 +175,7 @@ func (c *taskCommand) initAction(_ *fisk.ParseContext) error { return err } - err = admin.PrepareTasks(c.memory, c.replicas, c.retention) + err = admin.PrepareTasks(c.memory, c.replicas, c.retention, c.maxBytes, c.maxBytesSet) if err != nil { return err } diff --git a/ajc/util.go b/ajc/util.go index 83e25dd..2cbdc25 100644 --- a/ajc/util.go +++ b/ajc/util.go @@ -209,6 +209,7 @@ func showQueue(q *asyncjobs.QueueInfo) { fmt.Printf(" Memory Based: %t\n", q.Stream.Config.Storage == api.MemoryStorage) fmt.Printf(" Replicas: %d\n", q.Stream.Config.Replicas) fmt.Printf(" Archive Period: %s\n", humanizeDuration(q.Stream.Config.MaxAge)) + fmt.Printf(" Max Bytes: %d\n", q.Stream.Config.MaxBytes) fmt.Printf(" Max Task Tries: %d\n", q.Consumer.Config.MaxDeliver) fmt.Printf(" Max Run Time: %s\n", humanizeDuration(q.Consumer.Config.AckWait)) fmt.Printf(" Max Concurrent: %d\n", q.Consumer.Config.MaxAckPending) diff --git a/asyncjobs.go b/asyncjobs.go index 9c5dbe6..b77fb4a 100644 --- a/asyncjobs.go +++ b/asyncjobs.go @@ -22,6 +22,8 @@ const ( DefaultMaxTries = 10 // DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting DefaultQueueMaxConcurrent = 100 + // DefaultMaxBytes when not configured for a queue defaults to 10Mb + DefaultMaxBytes = 10485760 ) // StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream @@ -34,8 +36,8 @@ type StorageAdmin interface { DeleteQueue(name string) error PrepareQueue(q *Queue, replicas int, memory bool) error ConfigurationInfo() (*nats.KeyValueBucketStatus, error) - PrepareConfigurationStore(memory bool, replicas int) error - PrepareTasks(memory bool, replicas int, retention time.Duration) error + PrepareConfigurationStore(memory bool, replicas int, maxBytes int64, maxBytesSet bool) error + PrepareTasks(memory bool, replicas int, retention time.Duration, maxBytes int64, maxBytesSet bool) error DeleteTaskByID(id string) error TasksInfo() (*TasksInfo, error) Tasks(ctx context.Context, limit int32) (chan *Task, error) @@ -68,8 +70,8 @@ type Storage interface { TerminateItem(ctx context.Context, item *ProcessItem) error PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error) PrepareQueue(q *Queue, replicas int, memory bool) error - PrepareTasks(memory bool, replicas int, retention time.Duration) error - PrepareConfigurationStore(memory bool, replicas int) error + PrepareTasks(memory bool, replicas int, retention time.Duration, maxBytes int64, maxBytesSet bool) error + PrepareConfigurationStore(memory bool, replicas int, maxBytes int64, maxBytesSet bool) error SaveScheduledTask(st *ScheduledTask, update bool) error LoadScheduledTaskByName(name string) (*ScheduledTask, error) DeleteScheduledTaskByName(name string) error diff --git a/client.go b/client.go index 05dd29e..d1ba4a4 100644 --- a/client.go +++ b/client.go @@ -290,12 +290,12 @@ func (c *Client) startPrometheus() { } func (c *Client) setupStreams() error { - err := c.storage.PrepareTasks(c.opts.memoryStore, c.opts.replicas, c.opts.taskRetention) + err := c.storage.PrepareTasks(c.opts.memoryStore, c.opts.replicas, c.opts.taskRetention, c.opts.maxBytes, c.opts.maxBytesSet) if err != nil { return err } - return c.storage.PrepareConfigurationStore(c.opts.memoryStore, c.opts.replicas) + return c.storage.PrepareConfigurationStore(c.opts.memoryStore, c.opts.replicas, c.opts.maxBytes, c.opts.maxBytesSet) } func nowPointer() *time.Time { diff --git a/client_options.go b/client_options.go index a8ab621..affeba9 100644 --- a/client_options.go +++ b/client_options.go @@ -31,6 +31,8 @@ type ClientOpts struct { publicKey ed25519.PublicKey publicKeyFile string optionalTaskSignatures bool + maxBytes int64 + maxBytesSet bool nc *nats.Conn } @@ -313,3 +315,11 @@ func TaskSignaturesOptional() ClientOpt { return nil } } + +func MaxBytes(maxBytes int64) ClientOpt { + return func(opts *ClientOpts) error { + opts.maxBytes = maxBytes + opts.maxBytesSet = true + return nil + } +} diff --git a/queue.go b/queue.go index c228390..a8540e6 100644 --- a/queue.go +++ b/queue.go @@ -30,6 +30,8 @@ type Queue struct { MaxConcurrent int `json:"max_concurrent"` // NoCreate will not try to create a queue, will bind to an existing one or fail NoCreate bool + // MaxBytes is the maximum amount of bytes that can be stored in the queue + MaxBytes int64 `json:"max_bytes"` mu sync.Mutex storage Storage @@ -62,6 +64,7 @@ func newDefaultQueue() *Queue { MaxRunTime: time.Minute, MaxTries: 100, MaxConcurrent: DefaultQueueMaxConcurrent, + MaxBytes: DefaultMaxBytes, MaxAge: 0, DiscardOld: false, mu: sync.Mutex{}, diff --git a/storage.go b/storage.go index 4d2ed6b..fc70fe6 100644 --- a/storage.go +++ b/storage.go @@ -417,6 +417,9 @@ func (s *jetStreamStorage) createQueue(q *Queue, replicas int, memory bool) erro } else { opts = append(opts, jsm.DiscardNew()) } + if q.MaxBytes > -1 { + opts = append(opts, jsm.MaxBytes(q.MaxBytes)) + } var err error @@ -726,7 +729,7 @@ func (s *jetStreamStorage) SaveScheduledTask(st *ScheduledTask, update bool) err return nil } -func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int) error { +func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int, maxBytes int64, maxBytesSet bool) error { var err error if replicas == 0 { @@ -745,12 +748,18 @@ func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int) kv, err := js.KeyValue(ConfigBucketName) if err == nats.ErrBucketNotFound { - kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ + cfg := &nats.KeyValueConfig{ Bucket: ConfigBucketName, Description: "Choria Async Jobs Configuration", Storage: storage, Replicas: replicas, - }) + } + + if maxBytes > -1 { + cfg.MaxBytes = maxBytes + } + + kv, err = js.CreateKeyValue(cfg) } if err != nil { return err @@ -760,13 +769,19 @@ func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int) kv, err = js.KeyValue(LeaderElectionBucketName) if err == nats.ErrBucketNotFound { - kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ + cfg := &nats.KeyValueConfig{ Bucket: LeaderElectionBucketName, Description: "Choria Async Jobs Leader Elections", Storage: storage, Replicas: replicas, TTL: 10 * time.Second, - }) + } + + if maxBytesSet { + cfg.MaxBytes = maxBytes + } + + kv, err = js.CreateKeyValue(cfg) } if err != nil { return err @@ -778,7 +793,7 @@ func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int) } -func (s *jetStreamStorage) PrepareTasks(memory bool, replicas int, retention time.Duration) error { +func (s *jetStreamStorage) PrepareTasks(memory bool, replicas int, retention time.Duration, maxBytes int64, maxBytesSet bool) error { var err error if replicas == 0 { @@ -800,6 +815,10 @@ func (s *jetStreamStorage) PrepareTasks(memory bool, replicas int, retention tim opts = append(opts, jsm.MaxAge(retention)) + if maxBytesSet { + opts = append(opts, jsm.MaxBytes(maxBytes)) + } + s.tasks = &taskStorage{mgr: s.mgr} s.tasks.stream, err = s.mgr.LoadOrNewStream(TasksStreamName, opts...) if err != nil { diff --git a/storage_test.go b/storage_test.go index 6a23086..6e33d40 100644 --- a/storage_test.go +++ b/storage_test.go @@ -42,7 +42,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) wctx, cancel := context.WithTimeout(ctx, time.Second) @@ -61,7 +61,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) for i := 0; i < 2; i++ { @@ -114,7 +114,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) tasks, err := storage.ScheduledTasks(ctx) @@ -128,7 +128,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) for i := 0; i < 10; i++ { @@ -154,7 +154,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) _, err = storage.configBucket.Put("scheduled_tasks.test", []byte("{invalid")) @@ -170,7 +170,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) _, err = storage.LoadScheduledTaskByName("test") @@ -183,7 +183,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) st, _, err := newScheduledTask("daily", "@daily", "DEFAULT", "email:new", nil) @@ -206,7 +206,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) st, _, err := newScheduledTask("daily", "@daily", "DEFAULT", "email:new", nil) @@ -229,7 +229,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) st, _, err := newScheduledTask("daily", "@daily", "DEFAULT", "email:new", nil) @@ -250,7 +250,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(true, 1) + err = storage.PrepareConfigurationStore(true, 1, -1, false) Expect(err).ToNot(HaveOccurred()) kvs, err := storage.configBucket.Status() @@ -265,7 +265,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareConfigurationStore(false, 1) + err = storage.PrepareConfigurationStore(false, 1, -1, false) Expect(err).ToNot(HaveOccurred()) kvs, err := storage.configBucket.Status() @@ -283,7 +283,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) Expect(storage.tasks.stream.Storage()).To(Equal(api.MemoryStorage)) @@ -295,7 +295,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) Expect(storage.tasks.stream.MaxAge()).To(Equal(time.Hour)) @@ -307,7 +307,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(false, 1, 0) + err = storage.PrepareTasks(false, 1, 0, -1, false) Expect(err).ToNot(HaveOccurred()) stream := storage.tasks.stream @@ -329,7 +329,7 @@ var _ = Describe("Storage", func() { q := testQueue() err = storage.PrepareQueue(q, 1, true) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) task1, err := NewTask("ginkgo", "test") @@ -577,7 +577,7 @@ var _ = Describe("Storage", func() { q := testQueue() err = storage.PrepareQueue(q, 1, true) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", "test") @@ -616,7 +616,7 @@ var _ = Describe("Storage", func() { q := testQueue() err = storage.PrepareQueue(q, 1, true) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) @@ -647,7 +647,7 @@ var _ = Describe("Storage", func() { withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - Expect(storage.PrepareTasks(true, 1, time.Hour)).ToNot(HaveOccurred()) + Expect(storage.PrepareTasks(true, 1, time.Hour, -1, false)).ToNot(HaveOccurred()) nfo, err := storage.TasksInfo() Expect(err).ToNot(HaveOccurred()) @@ -666,7 +666,7 @@ var _ = Describe("Storage", func() { q1 := &Queue{Name: "Q1"} Expect(storage.PrepareQueue(q1, 1, true)).ToNot(HaveOccurred()) - Expect(storage.PrepareTasks(true, 1, time.Hour)).ToNot(HaveOccurred()) + Expect(storage.PrepareTasks(true, 1, time.Hour, -1, false)).ToNot(HaveOccurred()) task, err := NewTask("x", "x") Expect(err).ToNot(HaveOccurred()) @@ -693,7 +693,7 @@ var _ = Describe("Storage", func() { q1 := &Queue{Name: "Q1"} Expect(storage.PrepareQueue(q1, 1, true)).ToNot(HaveOccurred()) - Expect(storage.PrepareTasks(true, 1, time.Hour)).ToNot(HaveOccurred()) + Expect(storage.PrepareTasks(true, 1, time.Hour, -1, false)).ToNot(HaveOccurred()) task, err := NewTask("x", "x") Expect(err).ToNot(HaveOccurred()) @@ -761,7 +761,7 @@ var _ = Describe("Storage", func() { q := testQueue() err = storage.PrepareQueue(q, 1, true) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) @@ -795,7 +795,7 @@ var _ = Describe("Storage", func() { q := testQueue() - Expect(storage.PrepareTasks(true, 1, time.Hour)).ToNot(HaveOccurred()) + Expect(storage.PrepareTasks(true, 1, time.Hour, -1, false)).ToNot(HaveOccurred()) Expect(storage.RetryTaskByID(context.Background(), q, "123")).To(Equal(ErrTaskNotFound)) }) }) @@ -807,7 +807,7 @@ var _ = Describe("Storage", func() { q := testQueue() Expect(storage.PrepareQueue(q, 1, true)).ToNot(HaveOccurred()) - Expect(storage.PrepareTasks(true, 1, time.Hour)).ToNot(HaveOccurred()) + Expect(storage.PrepareTasks(true, 1, time.Hour, -1, false)).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) @@ -856,7 +856,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) q := testQueue() @@ -882,7 +882,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) q := testQueue() @@ -907,7 +907,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) q := testQueue() @@ -957,7 +957,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", nil) @@ -982,7 +982,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", nil) @@ -1023,7 +1023,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", nil) @@ -1046,7 +1046,7 @@ var _ = Describe("Storage", func() { withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - Expect(storage.PrepareTasks(true, 1, time.Hour)).ToNot(HaveOccurred()) + Expect(storage.PrepareTasks(true, 1, time.Hour, -1, false)).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", nil) Expect(err).ToNot(HaveOccurred()) @@ -1071,7 +1071,7 @@ var _ = Describe("Storage", func() { storage, err := newJetStreamStorage(nc, retryForTesting, &defaultLogger{}) Expect(err).ToNot(HaveOccurred()) - err = storage.PrepareTasks(true, 1, time.Hour) + err = storage.PrepareTasks(true, 1, time.Hour, -1, false) Expect(err).ToNot(HaveOccurred()) task, err := NewTask("ginkgo", nil)