diff --git a/.github/workflows/kit.test.yaml b/.github/workflows/kit.test.yaml index 16decae..86eef28 100644 --- a/.github/workflows/kit.test.yaml +++ b/.github/workflows/kit.test.yaml @@ -6,7 +6,10 @@ on: - "kit/**/*.go" env: + TESTINGDB_URL: postgresql://postgres:secret@localhost:5432/postgres TESTINGREDIS_URL: ":6379" + TESTINGNATS_URL: "nats://127.0.0.1:4222" + PUBSUB_EMULATOR_HOST: ":8085" jobs: test: diff --git a/kit/README.md b/kit/README.md new file mode 100644 index 0000000..3af2c8f --- /dev/null +++ b/kit/README.md @@ -0,0 +1,47 @@ +# Kit Framework +Kit is a framework for building production grade scalable service applications that can run in Kubernetes. + +The main goal of Kit is to provide a proven starting point that reduces the repetitive tasks required for a new project. +It is designed to follow (at least try to) idiomatic code and Go best practices. Collectively, +the project lays out everything logically to minimize guess work and enable engineers to quickly maintain a mental model for the project. + +## Starting point + +Starting building a new kit service is as simple as + +```go +// Main function. +// Everything start from here. +func main() { + podName := config.LookupEnv("POD_NAME", id.NewGenerator("myapp").Generate()) + ctx := context.Background() + + // Initiate a logger with pre-configuration for production and telemetry. + l, err := log.New() + if err != nil { + // in case we cannot create the logger, the app should immediately stop. + panic(err) + } + // Replace the global logger with the Service scoped log. + log.ReplaceGlobal(l) + + // Initialise the foundation and start the service + foundation, err := kit.NewFoundation(podName, kit.WithLogger(l)) + if err != nil { + l.Fatal(ctx, err.Error()) + } + + l.Info(ctx, "Starting service", log.String("pod.name", podName)) + + // Start the service + // + // This service will be automatically configured to: + // 1. Provide Observability information such as tracing, loging and metric + // 2. Provide default /readyz and /healthz endpoint for readiness and liveness probe and profiling via /debug/pprof + // 3. Setup for production setup + // 4. Graceful shutdown + if err := foundation.Serve(); err != nil { + l.Error(ctx, "fail serving", log.Error(err)) + } +} +``` \ No newline at end of file diff --git a/kit/cache/redis/redis.go b/kit/cache/redis/redis.go index 85bcd89..f6c7a3b 100644 --- a/kit/cache/redis/redis.go +++ b/kit/cache/redis/redis.go @@ -129,12 +129,12 @@ func (c *Cache) Set(ctx context.Context, key string, value interface{}, expirati } if value == nil { - return errors.Wrap(cache.ErrValueInvalid, "value should not be nil when putting to cache") + return cache.ErrValueInvalid } b, err := cache.Marshal(value) if err != nil { - return errors.Wrapf(err, "saving value to cache for key '%s'", key) + return errors.Wrapf(err, "marshalling value for key '%s'", key) } if err := c.client.Set(ctx, key, b, expiration).Err(); err != nil { diff --git a/kit/pubsub/nats/integration_test.go b/kit/pubsub/nats/integration_test.go index 8a8bdf8..8be11a6 100644 --- a/kit/pubsub/nats/integration_test.go +++ b/kit/pubsub/nats/integration_test.go @@ -228,86 +228,3 @@ func (n *natsTestSuite) TestClosedStates() { err = p.Publish(ctx, testClosingSubject, []byte("test closed publisher")) assert.Error(n.T(), pubsub.PublisherClosed, err) } - -func (n *natsTestSuite) TestInvalidArguments_Subscriber() { - m := make(map[string]interface{}) - m["queueGroup"] = test - m["consumer"] = n.consumer - m["nc"] = n.nc - m["js"] = n.js - - tt := []struct { - name string - queueGroup string - nc *nats.Conn - js nats.JetStreamContext - consumer *nats.ConsumerInfo - }{ - { - name: "queueGroup", - queueGroup: "", - nc: n.nc, - js: n.js, - consumer: n.consumer, - }, - { - name: "nc", - queueGroup: test, - nc: nil, - js: n.js, - consumer: n.consumer, - }, - { - name: "js", - queueGroup: test, - nc: n.nc, - js: nil, - consumer: n.consumer, - }, - { - name: "consumer", - queueGroup: test, - nc: n.nc, - js: n.js, - consumer: nil, - }, - } - for _, test := range tt { - test := test - tf := func(t *testing.T) { - _, err := NewSubscriber(test.queueGroup, test.nc, test.js, test.consumer) - assert.NoError(n.T(), err) - } - n.T().Run(test.name, tf) - } -} -func (n *natsTestSuite) TestInvalidArguments_Publisher() { - m := make(map[string]interface{}) - m["nc"] = n.nc - m["js"] = n.js - - tt := []struct { - name string - nc *nats.Conn - js nats.JetStreamContext - }{ - { - name: "nc", - nc: nil, - js: n.js, - }, - { - name: "js", - nc: n.nc, - js: nil, - }, - } - for _, test := range tt { - test := test - tf := func(t *testing.T) { - _, err := NewPublisher(test.nc, test.js) - assert.NoError(n.T(), err) - } - n.T().Run(test.name, tf) - } -} diff --git a/kit/pubsub/nats/publisher.go b/kit/pubsub/nats/publisher.go index 9af4ec3..d35c6f2 100644 --- a/kit/pubsub/nats/publisher.go +++ b/kit/pubsub/nats/publisher.go @@ -13,8 +13,10 @@ import ( "go.opentelemetry.io/otel/trace" ) +var _ pubsub.Publisher = (*Publisher)(nil) + // Publisher publishes a message on a NATS JetStream Stream's Pub/Sub topic. -// We don't use NATS core but JetStream as our use cases require persistence and better quality of service. +// // Subjects (topics) are managed by the server automatically following presence/absence of subscriptions // https://docs.nats.io/reference/faq#how-do-i-create-subjects // @@ -24,7 +26,7 @@ type Publisher struct { js nats.JetStreamContext } -// NewPublisher create a new GCP publisher. +// NewPublisher create a new Nats JetStream publisher. // // It required a call to Close in order to stop processing messages and close topic connections. func NewPublisher(nc *nats.Conn, js nats.JetStreamContext) (*Publisher, error) { diff --git a/kit/pubsub/nats/subscriber.go b/kit/pubsub/nats/subscriber.go index 3dc9fed..2f20360 100644 --- a/kit/pubsub/nats/subscriber.go +++ b/kit/pubsub/nats/subscriber.go @@ -3,6 +3,7 @@ package nats import ( "context" "fmt" + "sync" "github.com/anthonycorbacho/workspace/kit/errors" "github.com/anthonycorbacho/workspace/kit/pubsub" @@ -21,16 +22,13 @@ var _ pubsub.Subscriber = (*Subscriber)(nil) // The following features are available our of the box: // - automatic reconnection: https://docs.nats.io/using-nats/developer/connecting/reconnect type Subscriber struct { - // As we are likely to use the `Push + queue group` scenario - // this design implies that a subscriber corresponds to a unique queue group - // but may have multiple subscriptions + closing chan struct{} + closed bool + closedLock sync.Mutex queueGroup string - // A consumer could be created by the action of subscribing, - // but draining that subscription would also remove the consumer. - // So for more control, it's better to create Consumers independently - consumer *nats.ConsumerInfo - nc *nats.Conn - js nats.JetStreamContext + consumer *nats.ConsumerInfo + nc *nats.Conn + js nats.JetStreamContext } // NewSubscriber creates a new Nats Subscriber. @@ -51,6 +49,9 @@ func NewSubscriber(queueGroup string, natsClient *nats.Conn, jetStreamCtx nats.J } return &Subscriber{ + closing: make(chan struct{}, 1), + closed: false, + closedLock: sync.Mutex{}, queueGroup: queueGroup, nc: natsClient, js: jetStreamCtx, @@ -63,6 +64,12 @@ func NewSubscriber(queueGroup string, natsClient *nats.Conn, jetStreamCtx nats.J // It is caller's responsibility to configure client's connection's `DrainTimeout` and `ClosedHandler` (with WaitGroup) // https://docs.nats.io/using-nats/developer/receiving/drain func (s *Subscriber) Close() error { + if s.isClosed() { + return nil + } + s.setClosed(true) + close(s.closing) + if s.nc.IsClosed() { return pubsub.SubscriberCLosed } @@ -72,7 +79,6 @@ func (s *Subscriber) Close() error { // Subscribe consumes NATS Pub/Sub. // // NATS has two types of subscription: Pull and Push. -// ADA use-case seems to fit the `Push + queue group` scenario, which is why here we `QueueSubscribe“ // // Read more about it https://docs.nats.io/reference/faq#what-is-the-right-kind-of-stream-consumer-to-use // @@ -80,6 +86,16 @@ func (s *Subscriber) Close() error { // Depending on the Consumer `DeliverPolicy`, `all`, `last`, `new`, `by_start_time`, `by_start_sequence` // persisted messages can be received func (s *Subscriber) Subscribe(ctx context.Context, subscription string /* subject */, handler pubsub.Handler) error { + h := func(ctx context.Context, msg pubsub.Message, ack func(), nack func()) error { + // default behavior is to always ack. + ack() + return handler(ctx, msg) + } + + return s.SubscribeWithAck(ctx, subscription, h) +} + +func (s *Subscriber) SubscribeWithAck(ctx context.Context, subscription string /* subject */, handler pubsub.HandlerWithAck) error { if s.nc.IsClosed() { return fmt.Errorf("subscriber is closed") } @@ -87,22 +103,37 @@ func (s *Subscriber) Subscribe(ctx context.Context, subscription string /* subje return fmt.Errorf("subscription is nil") } - // exponential Backoff needed? - _, err := s.js.QueueSubscribe(subscription /* subject */, s.queueGroup, func(msg *nats.Msg) { - go s.receive(ctx, msg, handler) - }, nats.Bind(s.consumer.Stream, s.consumer.Name)) + subHandler := func(msg *nats.Msg) { + s.receive(ctx, msg, handler) + } + + _, err := s.js.QueueSubscribe( + subscription, /* subject */ + s.queueGroup, + subHandler, + nats.Bind(s.consumer.Stream, s.consumer.Name), + nats.ManualAck()) if err != nil { + return fmt.Errorf("subscription init failed: %v", err) } return nil } -func (s *Subscriber) SubscribeWithAck(ctx context.Context, subscription string /* subject */, handler pubsub.HandlerWithAck) error { - return errors.New("not implemented") -} +func (s *Subscriber) receive(ctx context.Context, msg *nats.Msg, handler pubsub.HandlerWithAck) { + + select { + case <-s.closing: + msg.Nak() + return + case <-ctx.Done(): + msg.Nak() + return + default: + // no-oop: responsibility of the caller + } -func (s *Subscriber) receive(ctx context.Context, msg *nats.Msg, handler pubsub.Handler) { // recreate the context with traces firstHeaders := make(map[string]string) for k, v := range msg.Header { @@ -119,10 +150,32 @@ func (s *Subscriber) receive(ctx context.Context, msg *nats.Msg, handler pubsub. span.SetAttributes(attribute.String("topic", msg.Subject)) defer span.End() + ack := func() { + msg.Ack() + } + nack := func() { + msg.Nak() + } + // Process the message // in case of error, we record and label the error in the span. - if err := handler(ctx, msg.Data); err != nil { + err := handler(ctx, msg.Data, ack, nack) + if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } } + +func (s *Subscriber) setClosed(value bool) { + s.closedLock.Lock() + defer s.closedLock.Unlock() + + s.closed = value +} + +func (s *Subscriber) isClosed() bool { + s.closedLock.Lock() + defer s.closedLock.Unlock() + + return s.closed +}