Skip to content

Commit

Permalink
Merge pull request #540 from ripienaar/load_sd
Browse files Browse the repository at this point in the history
Allow loading server StreamDetails JSON directly
  • Loading branch information
ripienaar authored Jun 17, 2024
2 parents eda7fbd + 01e66c2 commit 84c82f2
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 28 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ go 1.21

require (
github.com/dustin/go-humanize v1.0.1
github.com/expr-lang/expr v1.16.7
github.com/expr-lang/expr v1.16.9
github.com/google/go-cmp v0.6.0
github.com/klauspost/compress v1.17.8
github.com/klauspost/compress v1.17.9
github.com/nats-io/nats-server/v2 v2.11.0-preview.2
github.com/nats-io/nats.go v1.35.0
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nuid v1.0.1
golang.org/x/net v0.25.0
golang.org/x/text v0.15.0
golang.org/x/net v0.26.0
golang.org/x/text v0.16.0
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -22,8 +22,8 @@ require (
github.com/nats-io/jwt/v2 v2.5.7 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/expr-lang/expr v1.16.7 h1:gCIiHt5ODA0xIaDbD0DPKyZpM9Drph3b3lolYAYq2Kw=
github.com/expr-lang/expr v1.16.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI=
github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.0 h1:sQF6YqWMi+SCXpsmS3fd21oPy/vSddwZry4JnmltHVk=
github.com/google/go-tpm v0.9.0/go.mod h1:FkNVkc6C+IsvDI9Jw1OveJmxGZUUaKxtrpOS47QWKfU=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
Expand All @@ -22,8 +22,8 @@ github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c=
github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.11.0-preview.2 h1:tT/UeBbFzHRzwy77T/+/Rbw58XP9F3CY3VmtcDltZ68=
github.com/nats-io/nats-server/v2 v2.11.0-preview.2/go.mod h1:ILDVzrTqMco4rQMOgEZimBjJHb1oZDlz1J+qhJtZlRM=
github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk=
github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand All @@ -32,15 +32,15 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsK
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
9 changes: 2 additions & 7 deletions manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,12 @@ func streamPublish(t *testing.T, nc *nats.Conn, subj string, msg []byte) {
func startJSServer(t *testing.T) (*natsd.Server, *nats.Conn, *jsm.Manager) {
t.Helper()

d, err := os.MkdirTemp("", "jstest")
if err != nil {
t.Fatalf("temp dir could not be made: %s", err)
}

opts := &natsd.Options{
JetStream: true,
StoreDir: d,
Port: -1,
StoreDir: t.TempDir(),
Host: "localhost",
LogFile: "/dev/stdout",
HTTPPort: -1,
Trace: true,
}

Expand Down
43 changes: 43 additions & 0 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,49 @@ func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts
return m.streamFromConfig(&resp.Config, resp.StreamInfo), nil
}

// LoadFromStreamDetailBytes creates a stream info from the server StreamDetails in json format, the StreamDetails should
// be created with Config and Consumers options set
func (m *Manager) LoadFromStreamDetailBytes(sd []byte) (stream *Stream, consumers []*Consumer, err error) {
stream = &Stream{
mgr: m,
}

var nfo api.StreamInfo
err = json.Unmarshal(sd, &nfo)
if err != nil {
return nil, nil, err
}

stream.lastInfo = &nfo
stream.cfg = &nfo.Config

if stream.Name() == "" {
return nil, nil, fmt.Errorf("invalid stream details, ensure configuration is included")
}

var cons struct {
Consumers []*api.ConsumerInfo `json:"consumer_detail"`
}
err = json.Unmarshal(sd, &cons)
if err != nil {
return nil, nil, err
}

for _, consumer := range cons.Consumers {
c := Consumer{
name: consumer.Name,
stream: stream.Name(),
cfg: &consumer.Config,
lastInfo: consumer,
mgr: m,
}

consumers = append(consumers, &c)
}

return stream, consumers, nil
}

func (m *Manager) streamFromConfig(cfg *api.StreamConfig, info *api.StreamInfo) (stream *Stream) {
s := &Stream{cfg: cfg, mgr: m}
if info != nil {
Expand Down
69 changes: 69 additions & 0 deletions streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package jsm_test

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand All @@ -24,6 +25,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"

"github.com/nats-io/jsm.go"
Expand Down Expand Up @@ -168,6 +170,73 @@ func TestLoadStream(t *testing.T) {
}
}

func TestLoadFromStreamDetailBytes(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
defer nc.Flush()

q1, err := mgr.NewStream("q1", jsm.Subjects("in.q1"), jsm.FileStorage())
checkErr(t, err, "create failed")
c1, err := q1.NewConsumer(jsm.DurableName("C1"))
checkErr(t, err, "create failed")

_, err = q1.NewConsumer(jsm.DurableName("C2"))
checkErr(t, err, "create failed")

_, err = nc.Request("in.q1", nil, time.Second)
checkErr(t, err, "publish failed")
msg, err := c1.NextMsg()
checkErr(t, err, "next failed")
err = msg.Ack()
checkErr(t, err, "ack failed")

jsz, err := srv.Jsz(&server.JSzOptions{Streams: true, Config: true, Consumer: true})
checkErr(t, err, "jsz failed")
if len(jsz.AccountDetails) == 0 {
t.Fatalf("jsz should have at least one account")
}
if len(jsz.AccountDetails[0].Streams) == 0 {
t.Fatalf("jsz should have at least one stream")
}

sdb, err := json.Marshal(jsz.AccountDetails[0].Streams[0])
checkErr(t, err, "json marshal failed")

stream, consumers, err := mgr.LoadFromStreamDetailBytes(sdb)
checkErr(t, err, "load failed")

if stream.Name() != "q1" || !cmp.Equal(stream.Subjects(), []string{"in.q1"}) {
t.Fatalf("invalid stream")
}

state, err := stream.LatestState()
checkErr(t, err, "latest state failed")

if state.Msgs != 1 {
t.Fatalf("invalid state")
}
if state.Consumers != 2 {
t.Fatalf("invalid consumers")
}

if len(consumers) != 2 {
t.Fatalf("invalid consumers")
}

if consumers[0].Name() != "C1" || consumers[0].StreamName() != "q1" {
t.Fatalf("invalid c1 consumer")
}
af, err := consumers[0].AcknowledgedFloor()
checkErr(t, err, "ackfloor failed")
if af.Stream != 1 {
t.Fatalf("invalid ack floor")
}

if consumers[1].Name() != "C2" || consumers[1].StreamName() != "q1" {
t.Fatalf("invalid c2 consumer")
}
}

func TestStream_Reset(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
Expand Down

0 comments on commit 84c82f2

Please sign in to comment.