Skip to content

Commit

Permalink
Add Kit readme (#19)
Browse files Browse the repository at this point in the history
* Update nats pubsub

* Add Kit Readme
  • Loading branch information
anthonycorbacho authored Mar 24, 2023
1 parent a235ed9 commit 5749291
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 106 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/kit.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ on:
- "kit/**/*.go"

env:
TESTINGDB_URL: postgresql://postgres:secret@localhost:5432/postgres
TESTINGREDIS_URL: ":6379"
TESTINGNATS_URL: "nats://127.0.0.1:4222"
PUBSUB_EMULATOR_HOST: ":8085"

jobs:
test:
Expand Down
47 changes: 47 additions & 0 deletions kit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Kit Framework
Kit is a framework for building production grade scalable service applications that can run in Kubernetes.

The main goal of Kit is to provide a proven starting point that reduces the repetitive tasks required for a new project.
It is designed to follow (at least try to) idiomatic code and Go best practices. Collectively,
the project lays out everything logically to minimize guess work and enable engineers to quickly maintain a mental model for the project.

## Starting point

Starting building a new kit service is as simple as

```go
// Main function.
// Everything start from here.
func main() {
podName := config.LookupEnv("POD_NAME", id.NewGenerator("myapp").Generate())
ctx := context.Background()

// Initiate a logger with pre-configuration for production and telemetry.
l, err := log.New()
if err != nil {
// in case we cannot create the logger, the app should immediately stop.
panic(err)
}
// Replace the global logger with the Service scoped log.
log.ReplaceGlobal(l)

// Initialise the foundation and start the service
foundation, err := kit.NewFoundation(podName, kit.WithLogger(l))
if err != nil {
l.Fatal(ctx, err.Error())
}

l.Info(ctx, "Starting service", log.String("pod.name", podName))

// Start the service
//
// This service will be automatically configured to:
// 1. Provide Observability information such as tracing, loging and metric
// 2. Provide default /readyz and /healthz endpoint for readiness and liveness probe and profiling via /debug/pprof
// 3. Setup for production setup
// 4. Graceful shutdown
if err := foundation.Serve(); err != nil {
l.Error(ctx, "fail serving", log.Error(err))
}
}
```
4 changes: 2 additions & 2 deletions kit/cache/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ func (c *Cache) Set(ctx context.Context, key string, value interface{}, expirati
}

if value == nil {
return errors.Wrap(cache.ErrValueInvalid, "value should not be nil when putting to cache")
return cache.ErrValueInvalid
}

b, err := cache.Marshal(value)
if err != nil {
return errors.Wrapf(err, "saving value to cache for key '%s'", key)
return errors.Wrapf(err, "marshalling value for key '%s'", key)
}

if err := c.client.Set(ctx, key, b, expiration).Err(); err != nil {
Expand Down
83 changes: 0 additions & 83 deletions kit/pubsub/nats/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,86 +228,3 @@ func (n *natsTestSuite) TestClosedStates() {
err = p.Publish(ctx, testClosingSubject, []byte("test closed publisher"))
assert.Error(n.T(), pubsub.PublisherClosed, err)
}

func (n *natsTestSuite) TestInvalidArguments_Subscriber() {
m := make(map[string]interface{})
m["queueGroup"] = test
m["consumer"] = n.consumer
m["nc"] = n.nc
m["js"] = n.js

tt := []struct {
name string
queueGroup string
nc *nats.Conn
js nats.JetStreamContext
consumer *nats.ConsumerInfo
}{
{
name: "queueGroup",
queueGroup: "",
nc: n.nc,
js: n.js,
consumer: n.consumer,
},
{
name: "nc",
queueGroup: test,
nc: nil,
js: n.js,
consumer: n.consumer,
},
{
name: "js",
queueGroup: test,
nc: n.nc,
js: nil,
consumer: n.consumer,
},
{
name: "consumer",
queueGroup: test,
nc: n.nc,
js: n.js,
consumer: nil,
},
}
for _, test := range tt {
test := test
tf := func(t *testing.T) {
_, err := NewSubscriber(test.queueGroup, test.nc, test.js, test.consumer)
assert.NoError(n.T(), err)
}
n.T().Run(test.name, tf)
}
}
func (n *natsTestSuite) TestInvalidArguments_Publisher() {
m := make(map[string]interface{})
m["nc"] = n.nc
m["js"] = n.js

tt := []struct {
name string
nc *nats.Conn
js nats.JetStreamContext
}{
{
name: "nc",
nc: nil,
js: n.js,
},
{
name: "js",
nc: n.nc,
js: nil,
},
}
for _, test := range tt {
test := test
tf := func(t *testing.T) {
_, err := NewPublisher(test.nc, test.js)
assert.NoError(n.T(), err)
}
n.T().Run(test.name, tf)
}
}
6 changes: 4 additions & 2 deletions kit/pubsub/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"go.opentelemetry.io/otel/trace"
)

var _ pubsub.Publisher = (*Publisher)(nil)

// Publisher publishes a message on a NATS JetStream Stream's Pub/Sub topic.
// We don't use NATS core but JetStream as our use cases require persistence and better quality of service.
//
// Subjects (topics) are managed by the server automatically following presence/absence of subscriptions
// https://docs.nats.io/reference/faq#how-do-i-create-subjects
//
Expand All @@ -24,7 +26,7 @@ type Publisher struct {
js nats.JetStreamContext
}

// NewPublisher create a new GCP publisher.
// NewPublisher create a new Nats JetStream publisher.
//
// It required a call to Close in order to stop processing messages and close topic connections.
func NewPublisher(nc *nats.Conn, js nats.JetStreamContext) (*Publisher, error) {
Expand Down
91 changes: 72 additions & 19 deletions kit/pubsub/nats/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nats
import (
"context"
"fmt"
"sync"

"github.com/anthonycorbacho/workspace/kit/errors"
"github.com/anthonycorbacho/workspace/kit/pubsub"
Expand All @@ -21,16 +22,13 @@ var _ pubsub.Subscriber = (*Subscriber)(nil)
// The following features are available our of the box:
// - automatic reconnection: https://docs.nats.io/using-nats/developer/connecting/reconnect
type Subscriber struct {
// As we are likely to use the `Push + queue group` scenario
// this design implies that a subscriber corresponds to a unique queue group
// but may have multiple subscriptions
closing chan struct{}
closed bool
closedLock sync.Mutex
queueGroup string
// A consumer could be created by the action of subscribing,
// but draining that subscription would also remove the consumer.
// So for more control, it's better to create Consumers independently
consumer *nats.ConsumerInfo
nc *nats.Conn
js nats.JetStreamContext
consumer *nats.ConsumerInfo
nc *nats.Conn
js nats.JetStreamContext
}

// NewSubscriber creates a new Nats Subscriber.
Expand All @@ -51,6 +49,9 @@ func NewSubscriber(queueGroup string, natsClient *nats.Conn, jetStreamCtx nats.J
}

return &Subscriber{
closing: make(chan struct{}, 1),
closed: false,
closedLock: sync.Mutex{},
queueGroup: queueGroup,
nc: natsClient,
js: jetStreamCtx,
Expand All @@ -63,6 +64,12 @@ func NewSubscriber(queueGroup string, natsClient *nats.Conn, jetStreamCtx nats.J
// It is caller's responsibility to configure client's connection's `DrainTimeout` and `ClosedHandler` (with WaitGroup)
// https://docs.nats.io/using-nats/developer/receiving/drain
func (s *Subscriber) Close() error {
if s.isClosed() {
return nil
}
s.setClosed(true)
close(s.closing)

if s.nc.IsClosed() {
return pubsub.SubscriberCLosed
}
Expand All @@ -72,37 +79,61 @@ func (s *Subscriber) Close() error {
// Subscribe consumes NATS Pub/Sub.
//
// NATS has two types of subscription: Pull and Push.
// ADA use-case seems to fit the `Push + queue group` scenario, which is why here we `QueueSubscribe“
//
// Read more about it https://docs.nats.io/reference/faq#what-is-the-right-kind-of-stream-consumer-to-use
//
// IMPORTANT! Don't forget to filter messages on the consumer as subscriber's subscription doesn't seem to take priority.
// Depending on the Consumer `DeliverPolicy`, `all`, `last`, `new`, `by_start_time`, `by_start_sequence`
// persisted messages can be received
func (s *Subscriber) Subscribe(ctx context.Context, subscription string /* subject */, handler pubsub.Handler) error {
h := func(ctx context.Context, msg pubsub.Message, ack func(), nack func()) error {
// default behavior is to always ack.
ack()
return handler(ctx, msg)
}

return s.SubscribeWithAck(ctx, subscription, h)
}

func (s *Subscriber) SubscribeWithAck(ctx context.Context, subscription string /* subject */, handler pubsub.HandlerWithAck) error {
if s.nc.IsClosed() {
return fmt.Errorf("subscriber is closed")
}
if len(subscription) == 0 {
return fmt.Errorf("subscription is nil")
}

// exponential Backoff needed?
_, err := s.js.QueueSubscribe(subscription /* subject */, s.queueGroup, func(msg *nats.Msg) {
go s.receive(ctx, msg, handler)
}, nats.Bind(s.consumer.Stream, s.consumer.Name))
subHandler := func(msg *nats.Msg) {
s.receive(ctx, msg, handler)
}

_, err := s.js.QueueSubscribe(
subscription, /* subject */
s.queueGroup,
subHandler,
nats.Bind(s.consumer.Stream, s.consumer.Name),
nats.ManualAck())
if err != nil {

return fmt.Errorf("subscription init failed: %v", err)
}

return nil
}

func (s *Subscriber) SubscribeWithAck(ctx context.Context, subscription string /* subject */, handler pubsub.HandlerWithAck) error {
return errors.New("not implemented")
}
func (s *Subscriber) receive(ctx context.Context, msg *nats.Msg, handler pubsub.HandlerWithAck) {

select {
case <-s.closing:
msg.Nak()
return
case <-ctx.Done():
msg.Nak()
return
default:
// no-oop: responsibility of the caller
}

func (s *Subscriber) receive(ctx context.Context, msg *nats.Msg, handler pubsub.Handler) {
// recreate the context with traces
firstHeaders := make(map[string]string)
for k, v := range msg.Header {
Expand All @@ -119,10 +150,32 @@ func (s *Subscriber) receive(ctx context.Context, msg *nats.Msg, handler pubsub.
span.SetAttributes(attribute.String("topic", msg.Subject))
defer span.End()

ack := func() {
msg.Ack()
}
nack := func() {
msg.Nak()
}

// Process the message
// in case of error, we record and label the error in the span.
if err := handler(ctx, msg.Data); err != nil {
err := handler(ctx, msg.Data, ack, nack)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}

func (s *Subscriber) setClosed(value bool) {
s.closedLock.Lock()
defer s.closedLock.Unlock()

s.closed = value
}

func (s *Subscriber) isClosed() bool {
s.closedLock.Lock()
defer s.closedLock.Unlock()

return s.closed
}

0 comments on commit 5749291

Please sign in to comment.