diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index 5ee88c7..f82ecb4 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -41,7 +41,7 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream { j.createJetstreamContext() - j.createStream() + j.UpdateOrCreateStream() return j } @@ -72,13 +72,13 @@ func (j *Jetstream) createJetstreamContext() { } } -func (j *Jetstream) createStream() { +func (j *Jetstream) UpdateOrCreateStream() { for _, stream := range j.config.Streams { info, err := j.jetstream.StreamInfo(stream.Name) if err == nil { j.updateStream(stream, info) } else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow { - j.addStream(stream) + j.createStream(stream) } else { j.logger.Panic("could not add subject", zap.Error(err)) } @@ -98,7 +98,7 @@ func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) { j.logger.Info("stream updated") } -func (j *Jetstream) addStream(stream Stream) { +func (j *Jetstream) createStream(stream Stream) { _, err := j.jetstream.AddStream(&nats.StreamConfig{ Name: stream.Name, Subjects: []string{stream.Subject},