From 2a864b1b88671f3a8af2d45e8358098d57116448 Mon Sep 17 00:00:00 2001 From: Jeffrey lean <57609485+jeffreylean@users.noreply.github.com> Date: Wed, 2 Aug 2023 02:29:59 +0800 Subject: [PATCH] Feat/nats multi table support (#133) * add nats multi table sink support using NATS header * add multiple nats split-to-table supports * fix gcs test * add nats test config file --------- Co-authored-by: JeffreyLean --- Dockerfile | 2 +- internal/config/config.go | 14 +- internal/ingress/nats/jetstream/client.go | 67 +++++++ internal/ingress/nats/nats.go | 188 +++++++------------- internal/ingress/nats/nats_test.go | 87 +++------ internal/ingress/nats/nats_test_config.yaml | 8 + internal/server/server.go | 7 +- internal/server/server_ingest.go | 5 +- 8 files changed, 187 insertions(+), 191 deletions(-) create mode 100644 internal/ingress/nats/jetstream/client.go create mode 100644 internal/ingress/nats/nats_test_config.yaml diff --git a/Dockerfile b/Dockerfile index bae0075b..620f01b7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,7 +12,7 @@ RUN mkdir -p /go/src/talaria COPY . src/talaria RUN cd src/talaria && go build . && test -x talaria -FROM debian:latest AS base +FROM debian:bullseye AS base ARG MAINTAINER=roman.atachiants@gmail.com LABEL maintainer=${MAINTAINER} diff --git a/internal/config/config.go b/internal/config/config.go index f742933f..83af6b89 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -107,11 +107,15 @@ type S3SQS struct { // NATS represents NATS consumer configuration type NATS struct { - Subject string `json:"subject" yaml:"subject" env:"SUBJECT"` - Host string `json:"host" yaml:"host" env:"HOST"` - Port int32 `json:"port" yaml:"port" env:"PORT"` - Stream string `json:"stream" yaml:"stream" env:"STREAM"` - Queue string `json:"queue" yaml:"queue" env:"QUEUE"` + Host string `json:"host" yaml:"host" env:"HOST"` + Port int32 `json:"port" yaml:"port" env:"PORT"` + Split []SplitWriter `json:"split" yaml:"split" env:"SPLIT"` +} + +type SplitWriter struct { + Subject string `json:"subject" yaml:"subject" env:"SUBJECT"` + Table string `json:"table" yaml:"table" env:"TABLE"` + QueueGroup string `json:"queueGroup" yaml:"queueGroup" env:"QUEUE_GROUP"` } // Presto represents the Presto configuration diff --git a/internal/ingress/nats/jetstream/client.go b/internal/ingress/nats/jetstream/client.go new file mode 100644 index 00000000..5a1a966f --- /dev/null +++ b/internal/ingress/nats/jetstream/client.go @@ -0,0 +1,67 @@ +package jetstream + +import ( + "fmt" + "log" + + "github.com/kelindar/talaria/internal/config" + "github.com/kelindar/talaria/internal/monitor" + "github.com/nats-io/nats.go" +) + +type JSClient interface { + Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) + QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) + Publish(subj string, data []byte, opts ...nats.PubOpt) (*nats.PubAck, error) + // PublishMsg publishes a Msg to JetStream. + PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) + // AddStream creates a stream. + AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) + // DeleteStream deletes a stream. + DeleteStream(name string, opts ...nats.JSOpt) error +} + +type NatsClient interface { + Close() +} + +type Client struct { + Context JSClient + Server NatsClient +} + +// Create new jetstream client. +func New(conf *config.NATS, monitor monitor.Monitor) (*Client, error) { + nc, err := nats.Connect(fmt.Sprintf("%s:%d", conf.Host, conf.Port), nats.ReconnectHandler(func(_ *nats.Conn) { + log.Println("Successfully renonnect") + }), nats.ClosedHandler(func(nc *nats.Conn) { + log.Printf("Connection close due to %q", nc.LastError()) + monitor.Error(nc.LastError()) + }), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + log.Printf("Got disconnected. Reason: %q\n", nc.LastError()) + monitor.Error(nc.LastError()) + }), nats.ErrorHandler(natsErrHandler)) + if err != nil { + return nil, err + } + + js, err := nc.JetStream() + if err != nil { + return nil, err + } + client := &Client{js, nc} + + return client, nil +} + +func natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) { + if natsErr == nats.ErrSlowConsumer { + pendingMsgs, _, err := sub.Pending() + if err != nil { + log.Printf("couldn't get pending messages: %v", err) + return + } + log.Printf("Falling behind with %d pending messages on subject %q.\n", + pendingMsgs, sub.Subject) + } +} diff --git a/internal/ingress/nats/nats.go b/internal/ingress/nats/nats.go index 1090c290..0c1851d0 100644 --- a/internal/ingress/nats/nats.go +++ b/internal/ingress/nats/nats.go @@ -4,181 +4,127 @@ import ( "context" "encoding/json" "fmt" - "log" "github.com/kelindar/talaria/internal/config" + "github.com/kelindar/talaria/internal/ingress/nats/jetstream" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" "github.com/nats-io/nats.go" ) const ( - ctxTag = "NATS" + ctxTag = "NATS" + tableNameHeader = "table" ) type Ingress struct { - // jetstream exposed interface. - jetstream JetstreamI - monitor monitor.Monitor - conn *nats.Conn - queue chan *nats.Msg - cancel context.CancelFunc + JSClient jetstream.Client + monitor monitor.Monitor + cancel context.CancelFunc + split []splitWriter } -type jetstream struct { - // The name of the queue group. - queue string // The name of subject listening to. - subject string - // The jetstream context which provide jetstream api. - jsContext nats.JetStreamContext -} +type Event map[string]interface{} -type JetstreamI interface { - // Subscribe to defined subject from Nats server. - Subscribe(handler nats.MsgHandler) (*nats.Subscription, error) - Publish(msg []byte) (nats.PubAckFuture, error) +type splitWriter struct { + subject string + table string + queueGroup string + queue chan *nats.Msg } -type Event map[string]interface{} - // New create new ingestion from nats jetstream to sinks. func New(conf *config.NATS, monitor monitor.Monitor) (*Ingress, error) { - nc, err := nats.Connect(fmt.Sprintf("%s:%d", conf.Host, conf.Port), nats.ReconnectHandler(func(_ *nats.Conn) { - log.Println("Successfully renonnect") - }), nats.ClosedHandler(func(nc *nats.Conn) { - log.Printf("Connection close due to %q", nc.LastError()) - monitor.Error(nc.LastError()) - }), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { - log.Printf("Got disconnected. Reason: %q\n", nc.LastError()) - monitor.Error(nc.LastError()) - }), nats.ErrorHandler(natsErrHandler)) + jsClient, err := jetstream.New(conf, monitor) if err != nil { return nil, err } - js, err := NewJetStream(conf.Subject, conf.Queue, nc) - if err != nil { - return nil, err + split := make([]splitWriter, len(conf.Split)) + for i, s := range conf.Split { + split[i] = splitWriter{subject: s.Subject, table: s.Table, queue: make(chan *nats.Msg, 100), queueGroup: s.QueueGroup} } return &Ingress{ - jetstream: js, - monitor: monitor, - conn: nc, - queue: make(chan *nats.Msg, 100), - }, nil -} - -func natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) { - log.Printf("error: %v\n", natsErr) - if natsErr == nats.ErrSlowConsumer { - pendingMsgs, _, err := sub.Pending() - if err != nil { - log.Printf("couldn't get pending messages: %v", err) - return - } - log.Printf("Falling behind with %d pending messages on subject %q.\n", - pendingMsgs, sub.Subject) - } -} - -// NewJetStream create Jetstream context -func NewJetStream(subject, queue string, nc *nats.Conn) (*jetstream, error) { - js, err := nc.JetStream() - if err != nil { - return nil, err - } - return &jetstream{ - subject: subject, - queue: queue, - jsContext: js, + JSClient: *jsClient, + monitor: monitor, + split: split, }, nil } -// Subscribe to a subject in nats server -func (s *jetstream) Subscribe(handler nats.MsgHandler) (*nats.Subscription, error) { - // Queuesubscribe automatically create ephemeral push based consumer with queue group defined. - sb, err := s.jsContext.QueueSubscribe(s.subject, s.queue, handler) - if err != nil { - return nil, err - } - // set higher pending limits - sb.SetPendingLimits(65536, (1<<18)*1024) - _, b, _ := sb.PendingLimits() - log.Println("nats: maximum pending limits (bytes): ", b) - return sb, nil -} - -// Publish message to the subject in nats server -func (s *jetstream) Publish(msg []byte) (nats.PubAckFuture, error) { - p, err := s.jsContext.PublishAsync(s.subject, msg) - if err != nil { - return nil, err - } - return p, nil -} - // SubsribeHandler subscribes to specific subject and unmarshal the message into talaria's event type. // The event message then will be used as the input of the handler function defined. -func (i *Ingress) SubsribeHandler(handler func(b []map[string]interface{})) error { - _, err := i.jetstream.Subscribe(func(msg *nats.Msg) { - block := make([]map[string]interface{}, 0) - if err := json.Unmarshal(msg.Data, &block); err != nil { - i.monitor.Error(errors.Internal("nats: unable to unmarshal", err)) +func (i *Ingress) SubsribeHandler(handler func(b []map[string]interface{}, table string)) error { + for _, s := range i.split { + // Queuesubscribe automatically create ephemeral push based consumer with queue group defined. + sb, err := i.JSClient.Context.QueueSubscribe(s.subject, s.queueGroup, func(msg *nats.Msg) { + block := make([]map[string]interface{}, 0) + if err := json.Unmarshal(msg.Data, &block); err != nil { + i.monitor.Error(errors.Internal("nats: unable to unmarshal", err)) + } + i.monitor.Count1(ctxTag, "NATS.subscribe.count") + handler(block, s.table) + }) + if err != nil { + i.monitor.Error(err) } - i.monitor.Count1(ctxTag, "NATS.subscribe.count") - handler(block) - }) - if err != nil { - return err + // set higher pending limits + sb.SetPendingLimits(65536, (1<<18)*1024) + i.monitor.Info(fmt.Sprintf("%s->%s split created", s.subject, s.table)) } return nil } // SubscribeHandlerWithPool process the message concurrently using goroutine pool. // The message will be asynchornously executed to reduce the message process time to avoid being slow consumer. -func (i *Ingress) SubsribeHandlerWithPool(ctx context.Context, handler func(b []map[string]interface{})) error { +func (i *Ingress) SubsribeHandlerWithPool(ctx context.Context, handler func(b []map[string]interface{}, split string)) error { // Initialze pool ctx, cancel := context.WithCancel(ctx) i.cancel = cancel i.initializeMemoryPool(ctx, handler) - _, err := i.jetstream.Subscribe(func(msg *nats.Msg) { - // Send the message to the queue - i.queue <- msg - }) - if err != nil { - return err + for _, s := range i.split { + queue := s.queue + _, err := i.JSClient.Context.QueueSubscribe(s.subject, s.queueGroup, func(msg *nats.Msg) { + queue <- msg + }) + if err != nil { + i.monitor.Error(fmt.Errorf("nats: queue subscribe error %v", err)) + continue + } + i.monitor.Info(fmt.Sprintf("%s->%s split created", s.subject, s.table)) } return nil } // Initialze memory pool for fixed number of goroutine to process the message -func (i *Ingress) initializeMemoryPool(ctx context.Context, handler func(b []map[string]interface{})) { - for n := 0; n < 100; n++ { - go func(n int, ctx context.Context, queue chan *nats.Msg) { - for { - select { - case <-ctx.Done(): - return - case msg := <-queue: - // Wait for the message - block := make([]map[string]interface{}, 0) - if err := json.Unmarshal(msg.Data, &block); err != nil { - i.monitor.Error(errors.Internal("nats: unable to unmarshal", err)) +func (i *Ingress) initializeMemoryPool(ctx context.Context, handler func(b []map[string]interface{}, split string)) { + for _, s := range i.split { + for n := 0; n < 100; n++ { + go func(n int, ctx context.Context, queue chan *nats.Msg, table string, sub string) { + for { + select { + case <-ctx.Done(): + return + case msg := <-queue: + // Wait for the message + block := make([]map[string]interface{}, 0) + if err := json.Unmarshal(msg.Data, &block); err != nil { + i.monitor.Error(errors.Internal("nats: unable to unmarshal", err)) + } + i.monitor.Count1(ctxTag, "NATS.msg.count") + // asynchornously execute handler to reduce the message process time to avoid being slow consumer. + handler(block, table) } - i.monitor.Count1(ctxTag, "NATS.subscribe.count") - // asynchornously execute handler to reduce the message process time to avoid being slow consumer. - handler(block) } - } - }(n, ctx, i.queue) + }(n, ctx, s.queue, s.table, s.subject) + } } } // Close ingress func (i *Ingress) Close() { - i.conn.Close() + i.JSClient.Server.Close() i.cancel() return } diff --git a/internal/ingress/nats/nats_test.go b/internal/ingress/nats/nats_test.go index b3b2c860..09ad57a6 100644 --- a/internal/ingress/nats/nats_test.go +++ b/internal/ingress/nats/nats_test.go @@ -3,15 +3,16 @@ package nats import ( "context" "encoding/json" - "fmt" "os" "testing" "time" "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/config/env" + "github.com/kelindar/talaria/internal/config/s3" "github.com/kelindar/talaria/internal/config/static" "github.com/kelindar/talaria/internal/monitor" + "github.com/kelindar/talaria/internal/monitor/logging" "github.com/nats-io/nats-server/v2/server" natsserver "github.com/nats-io/nats-server/v2/test" "github.com/nats-io/nats.go" @@ -37,56 +38,22 @@ func TestLoadNatsConfig(t *testing.T) { const refreshTime = 50 * time.Millisecond const waitTime = 100 * time.Millisecond nats := &config.NATS{ - Host: "nats://127.0.0.1", - Port: TEST_PORT, - Subject: "event.talaria", - Queue: "talarias", + Host: "nats://127.0.0.1", + Port: TEST_PORT, + Split: []config.SplitWriter{ + {Subject: "event.talaria", QueueGroup: "talarias", Table: "event"}, + }, } + os.Setenv("NATS_URI", "file:///nats_test_config.yaml") - os.Setenv("TALARIA_WRITERS_NATS_HOST", "nats://127.0.0.1") - os.Setenv("TALARIA_WRITERS_NATS_PORT", fmt.Sprint(TEST_PORT)) - os.Setenv("TALARIA_WRITERS_NATS_SUBJECT", "event.talaria") - os.Setenv("TALARIA_WRITERS_NATS_QUEUE", "talarias") + s3Configurer := s3.New(logging.NewStandard()) + cfg := config.Load(context.Background(), refreshTime, static.New(), env.New("NATS"), s3Configurer) - cfg := config.Load(context.Background(), refreshTime, static.New(), env.New("TALARIA")) assert.Equal(t, nats, cfg().Writers.NATS) conf = *cfg().Writers.NATS } -func TestSubscribe(t *testing.T) { - s := RunServerOnPort(int(conf.Port)) - defer s.Shutdown() - - ingress, err := New(&conf, monitor.NewNoop()) - assert.NoError(t, err) - assert.NotNil(t, ingress) - - //Create stream - jsCtx, err := ingress.conn.JetStream() - assert.Nil(t, err) - - // Delete stream first in case exists - jsCtx.DeleteStream("events") - - info, err := jsCtx.AddStream(&nats.StreamConfig{Name: "events", Subjects: []string{"event.>"}}) - assert.NoError(t, err) - assert.NotNil(t, info) - - dataCn := make(chan string) - _, err = ingress.jetstream.Subscribe(func(msg *nats.Msg) { - dataCn <- string(msg.Data) - }) - assert.NoError(t, err) - - p, err := ingress.jetstream.Publish([]byte("test")) - assert.NotNil(t, p) - assert.Nil(t, err) - - data := <-dataCn - assert.NotEmpty(t, data) -} - func TestSubscribeHandler(t *testing.T) { s := RunServerOnPort(int(conf.Port)) @@ -96,28 +63,29 @@ func TestSubscribeHandler(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, ingress) - //Create stream - jsCtx, err := ingress.conn.JetStream() - assert.Nil(t, err) - // Delete stream first in case exists - jsCtx.DeleteStream("events") + ingress.JSClient.Context.DeleteStream("events") - info, err := jsCtx.AddStream(&nats.StreamConfig{Name: "events", Subjects: []string{"event.>"}}) + info, err := ingress.JSClient.Context.AddStream(&nats.StreamConfig{Name: "events", Subjects: []string{"event.>"}}) assert.NoError(t, err) assert.NotNil(t, info) dataCn := make(chan []map[string]interface{}) - ingress.SubsribeHandler(func(block []map[string]interface{}) { + ingress.SubsribeHandler(func(block []map[string]interface{}, table string) { dataCn <- block + assert.Equal(t, conf.Split[0].Table, table) }) test := []map[string]interface{}{{ "event": "event1", "text": "hi", }} + + // Publish message + msg := nats.NewMsg("event.talaria") b, _ := json.Marshal(test) + msg.Data = b - p, err := ingress.jetstream.Publish(b) + p, err := ingress.JSClient.Context.PublishMsg(msg) assert.NotNil(t, p) assert.Nil(t, err) @@ -134,29 +102,30 @@ func TestSubscribeHandlerWithPool(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, ingress) - //Create stream - jsCtx, err := ingress.conn.JetStream() - assert.Nil(t, err) - // Delete stream first in case exists - jsCtx.DeleteStream("events") + ingress.JSClient.Context.DeleteStream("events") - info, err := jsCtx.AddStream(&nats.StreamConfig{Name: "events", Subjects: []string{"event.>"}}) + info, err := ingress.JSClient.Context.AddStream(&nats.StreamConfig{Name: "events", Subjects: []string{"event.>"}}) assert.NoError(t, err) assert.NotNil(t, info) dataCn := make(chan []map[string]interface{}) ctx, cancel := context.WithCancel(context.Background()) - ingress.SubsribeHandlerWithPool(ctx, func(block []map[string]interface{}) { + ingress.SubsribeHandlerWithPool(ctx, func(block []map[string]interface{}, table string) { dataCn <- block + assert.Equal(t, conf.Split[0].Table, table) }) test := []map[string]interface{}{{ "event": "event1", "text": "hi", }} + + // Publish message + msg := nats.NewMsg("event.talaria") b, _ := json.Marshal(test) + msg.Data = b - p, err := ingress.jetstream.Publish(b) + p, err := ingress.JSClient.Context.PublishMsg(msg) assert.NotNil(t, p) assert.Nil(t, err) diff --git a/internal/ingress/nats/nats_test_config.yaml b/internal/ingress/nats/nats_test_config.yaml new file mode 100644 index 00000000..21238eba --- /dev/null +++ b/internal/ingress/nats/nats_test_config.yaml @@ -0,0 +1,8 @@ +writers: + nats: + host: nats://127.0.0.1 + port: 8369 + split: + - subject: event.talaria + queueGroup: talarias + table: event diff --git a/internal/server/server.go b/internal/server/server.go index 6e7d24f7..8c12ac53 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -185,13 +185,14 @@ func (s *Server) subscribeToJetStream(conf *config.Config) (err error) { // Start ingesting s.monitor.Info("server: starting ingestion from nats") - s.nats.SubsribeHandlerWithPool(context.Background(), func(block []map[string]interface{}) { + s.nats.SubsribeHandlerWithPool(context.Background(), func(block []map[string]interface{}, table string) { data := strings.NewEncoder().Encode(block) - if _, err := s.Ingest(context.Background(), &talaria.IngestRequest{ - Data: &talaria.IngestRequest_Batch{ + if _, err := s.IngestWithTable(context.Background(), &talaria.IngestWithTableRequest{ + Data: &talaria.IngestWithTableRequest_Batch{ Batch: data, }, + Tables: []string{table}, }); err != nil { s.monitor.Warning(err) } diff --git a/internal/server/server_ingest.go b/internal/server/server_ingest.go index 57d05a0c..7c0770b4 100644 --- a/internal/server/server_ingest.go +++ b/internal/server/server_ingest.go @@ -80,7 +80,6 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t } } s.monitor.Count("server", fmt.Sprintf("%s.ingest.row.count", t.Name()), int64(rowCount)) - s.monitor.Count("server", fmt.Sprintf("%s.ingest.count", t.Name()), int64(len(blocks))) } return nil, nil @@ -122,6 +121,7 @@ func (s *Server) IngestWithTable(ctx context.Context, request *talaria.IngestWit return nil, errors.Internal("unable to read the block", err) } + rowCount := 0 // If table supports streaming, then stream if streamer, ok := t.(storage.Streamer); ok { s := stream.Publish(streamer, s.monitor) @@ -131,6 +131,7 @@ func (s *Server) IngestWithTable(ctx context.Context, request *talaria.IngestWit if err != nil { return nil, err } + rowCount += len(rows) for _, row := range rows { _, err := s(row) if err != nil { @@ -149,7 +150,7 @@ func (s *Server) IngestWithTable(ctx context.Context, request *talaria.IngestWit } } - s.monitor.Count("server", fmt.Sprintf("%s.ingestWithTable.count", t.Name()), int64(len(blocks))) + s.monitor.Count("server", fmt.Sprintf("%s.ingestWithTable.count", t.Name()), int64(rowCount)) } return nil, nil