diff --git a/README.md b/README.md index 63ccb49..e7ac553 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ and using the same aggregation format that [KPL][kpl-url] use. ### Useful links +- [Documentation][godoc-url] - [Aggregation format][aggregation-format-url] - [Considerations When Using KPL Aggregation][kpl-aggregation] - [Consumer De-aggregation][de-aggregation] @@ -22,13 +23,11 @@ import ( ) func main() { - log := logrus.New() client := kinesis.New(session.New(aws.NewConfig())) pr := producer.New(&producer.Config{ StreamName: "test", BacklogCount: 2000, - Client: client, - Logger: log, + Client: client }) pr.Start() @@ -55,6 +54,45 @@ func main() { } ``` +#### Specifying logger implementation +`producer.Config` takes an optional `logging.Logger` implementation. + +##### Using a custom logger +```go +customLogger := &CustomLogger{} + +&producer.Config{ + StreamName: "test", + BacklogCount: 2000, + Client: client, + Logger: customLogger, +} +``` + +#### Using logrus + +```go +import ( + "github.com/sirupsen/logrus" + producer "github.com/a8m/kinesis-producer" + "github.com/a8m/kinesis-producer/loggers" +) + +log := logrus.New() + +&producer.Config{ + StreamName: "test", + BacklogCount: 2000, + Client: client, + Logger: loggers.Logrus(log), +} +``` + +kinesis-producer ships with three logger implementations. + +- `producer.Standard` used the standard library logger +- `loggers.Logrus` uses logrus logger +- `loggers.Zap` uses zap logger ### License MIT diff --git a/config.go b/config.go index f65ae58..0bbd8d0 100644 --- a/config.go +++ b/config.go @@ -1,10 +1,11 @@ package producer import ( + "log" + "os" "time" k "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/sirupsen/logrus" ) // Constants and default configuration take from: @@ -54,8 +55,8 @@ type Config struct { // Number of requests to sent concurrently. Default to 24. MaxConnections int - // Logger is the logger used. Default to logrus.Log. - Logger logrus.FieldLogger + // Logger is the logger used. Default to producer.Logger. + Logger Logger // Enabling verbose logging. Default to false. Verbose bool @@ -67,7 +68,7 @@ type Config struct { // defaults for configuration func (c *Config) defaults() { if c.Logger == nil { - c.Logger = logrus.New() + c.Logger = &StdLogger{log.New(os.Stdout, "", log.LstdFlags)} } if c.BatchCount == 0 { c.BatchCount = maxRecordsPerRequest diff --git a/example_test.go b/example_test.go index 0e6abcc..a3a511b 100644 --- a/example_test.go +++ b/example_test.go @@ -1,22 +1,23 @@ package producer import ( + "log" + "os" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/sirupsen/logrus" ) func ExampleSimple() { - log := logrus.New() + logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)} client := kinesis.New(session.New(aws.NewConfig())) pr := New(&Config{ StreamName: "test", BacklogCount: 2000, Client: client, - Logger: log, + Logger: logger, }) pr.Start() @@ -25,7 +26,7 @@ func ExampleSimple() { go func() { for r := range pr.NotifyFailures() { // r contains `Data`, `PartitionKey` and `Error()` - log.Error(r) + logger.Error("detected put failure", r.error) } }() @@ -33,7 +34,7 @@ func ExampleSimple() { for i := 0; i < 5000; i++ { err := pr.Put([]byte("foo"), "bar") if err != nil { - log.WithError(err).Fatal("error producing") + logger.Error("error producing", err) } } }() diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..c27dfee --- /dev/null +++ b/logger.go @@ -0,0 +1,46 @@ +package producer + +import ( + "fmt" + "log" + "strings" +) + +// Logger represents a simple interface used by kinesis-producer to handle logging +type Logger interface { + Info(msg string, values ...LogValue) + Error(msg string, err error, values ...LogValue) +} + +// LogValue represents a key:value pair used by the Logger interface +type LogValue struct { + Name string + Value interface{} +} + +func (v LogValue) String() string { + return fmt.Sprintf(" %s=%s", v.Name, v.Value) +} + +// StdLogger implements the Logger interface using standard library loggers +type StdLogger struct { + Logger *log.Logger +} + +// Info prints log message +func (l *StdLogger) Info(msg string, values ...LogValue) { + l.Logger.Print(msg, l.valuesToString(values...)) +} + +// Error prints log message +func (l *StdLogger) Error(msg string, err error, values ...LogValue) { + l.Logger.Print(msg, l.valuesToString(values...), err) +} + +func (l *StdLogger) valuesToString(values ...LogValue) string { + parts := make([]string, len(values)) + for i, v := range values { + parts[i] = fmt.Sprint(v) + } + return strings.Join(parts, ", ") +} diff --git a/loggers/kplogrus/logrus.go b/loggers/kplogrus/logrus.go new file mode 100644 index 0000000..27f6792 --- /dev/null +++ b/loggers/kplogrus/logrus.go @@ -0,0 +1,29 @@ +package kplogrus + +import ( + producer "github.com/a8m/kinesis-producer" + "github.com/sirupsen/logrus" +) + +// Logger implements a logurs.Logger logger for kinesis-producer +type Logger struct { + Logger *logrus.Logger +} + +// Info logs a message +func (l *Logger) Info(msg string, args ...producer.LogValue) { + l.Logger.WithFields(l.valuesToFields(args...)).Info(msg) +} + +// Error logs an error +func (l *Logger) Error(msg string, err error, args ...producer.LogValue) { + l.Logger.WithError(err).WithFields(l.valuesToFields(args...)).Error(msg) +} + +func (l *Logger) valuesToFields(values ...producer.LogValue) logrus.Fields { + fields := logrus.Fields{} + for _, v := range values { + fields[v.Name] = v.Value + } + return fields +} diff --git a/loggers/kpzap/zap.go b/loggers/kpzap/zap.go new file mode 100644 index 0000000..a885437 --- /dev/null +++ b/loggers/kpzap/zap.go @@ -0,0 +1,32 @@ +package kpzap + +import ( + "go.uber.org/zap" + + producer "github.com/a8m/kinesis-producer" +) + +// Logger implements a zap.Logger logger for kinesis-producer +type Logger struct { + Logger *zap.Logger +} + +// Info logs a message +func (l *Logger) Info(msg string, values ...producer.LogValue) { + l.Logger.Info(msg, l.valuesToFields(values)...) +} + +// Error logs an error +func (l *Logger) Error(msg string, err error, values ...producer.LogValue) { + fields := l.valuesToFields(values) + fields = append(fields, zap.Error(err)) + l.Logger.Info(msg, fields...) +} + +func (l *Logger) valuesToFields(values []producer.LogValue) []zap.Field { + fields := make([]zap.Field, len(values)) + for i, v := range values { + fields[i] = zap.Any(v.Name, v.Value) + } + return fields +} diff --git a/producer.go b/producer.go index c821f81..75f671a 100644 --- a/producer.go +++ b/producer.go @@ -8,10 +8,10 @@ package producer import ( "errors" + "fmt" "sync" "time" - "github.com/sirupsen/logrus" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/jpillora/backoff" ) @@ -92,7 +92,7 @@ func (p *Producer) Put(data []byte, partitionKey string) error { p.Lock() if needToDrain { if record, err = p.aggregator.Drain(); err != nil { - p.Logger.WithError(err).Error("drain aggregator") + p.Logger.Error("drain aggregator", err) } } p.aggregator.Put(data, partitionKey) @@ -129,7 +129,7 @@ func (p *Producer) NotifyFailures() <-chan *FailureRecord { // Start the producer func (p *Producer) Start() { - p.Logger.WithField("stream", p.StreamName).Info("starting producer") + p.Logger.Info("starting producer", LogValue{"stream", p.StreamName}) go p.loop() } @@ -138,7 +138,7 @@ func (p *Producer) Stop() { p.Lock() p.stopped = true p.Unlock() - p.Logger.WithField("backlog", len(p.records)).Info("stopping producer") + p.Logger.Info("stopping producer", LogValue{"backlog", len(p.records)}) // drain if record, ok := p.drainIfNeed(); ok { @@ -225,7 +225,7 @@ func (p *Producer) drainIfNeed() (*kinesis.PutRecordsRequestEntry, bool) { record, err := p.aggregator.Drain() p.Unlock() if err != nil { - p.Logger.WithError(err).Error("drain aggregator") + p.Logger.Error("drain aggregator", err) } else { return record, true } @@ -243,14 +243,14 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin defer p.semaphore.release() for { - p.Logger.WithField("reason", reason).Infof("flush %v records", len(records)) + p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)}) out, err := p.Client.PutRecords(&kinesis.PutRecordsInput{ StreamName: &p.StreamName, Records: records, }) if err != nil { - p.Logger.WithError(err).Error("flush") + p.Logger.Error("flush", err) p.RLock() notify := p.notify p.RUnlock() @@ -262,15 +262,15 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin if p.Verbose { for i, r := range out.Records { - fields := make(logrus.Fields) + values := make([]LogValue, 2) if r.ErrorCode != nil { - fields["ErrorCode"] = *r.ErrorCode - fields["ErrorMessage"] = *r.ErrorMessage + values[0] = LogValue{"ErrorCode", *r.ErrorCode} + values[1] = LogValue{"ErrorMessage", *r.ErrorMessage} } else { - fields["ShardId"] = *r.ShardId - fields["SequenceNumber"] = *r.SequenceNumber + values[0] = LogValue{"ShardId", *r.ShardId} + values[1] = LogValue{"SequenceNumber", *r.SequenceNumber} } - p.Logger.WithFields(fields).Infof("Result[%d]", i) + p.Logger.Info(fmt.Sprintf("Result[%d]", i), values...) } } @@ -281,11 +281,11 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin duration := b.Duration() - p.Logger.WithFields(logrus.Fields{ - "failures": failed, - "backoff": duration.String(), - }).Warn("put failures") - + p.Logger.Info( + "put failures", + LogValue{"failures", failed}, + LogValue{"backoff", duration.String()}, + ) time.Sleep(duration) // change the logging state for the next itertion