Skip to content

Commit

Permalink
insert error logger
Browse files Browse the repository at this point in the history
  • Loading branch information
edwvee committed Jan 27, 2022
1 parent 8806308 commit eff78c4
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 41 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ A server for batching single inserts to databases. Gather many single or just sm
#remove if you won't profile
pprof_http_bind = "localhost:6034"

#log for insert errors (not for sync=1 requests)
#format: {"timestamp":..., "timestamp_string":..., "error": ..., "table":..., "fields":..., "rows": ...}\n
#remove or leave empty path if not needed
[insert_error_logger]
path = "error.log"
pretty_print = true

[receivers]

[receivers.first-http]
Expand All @@ -39,6 +46,7 @@ pprof_http_bind = "localhost:6034"
#use this type for clickhouse
type = "clickhouse"
#connection string (look here https://github.com/ClickHouse/clickhouse-go#dsn)
#use native tcp interface, not http or mysql
dsn = "tcp://localhost:9000?user=default"
#maximum simultaneous connections (treat like maximum simultaneous queries)
max_connections = 2
Expand Down
7 changes: 7 additions & 0 deletions assets/config_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
#remove if you won't profile
pprof_http_bind = "localhost:6034"

#log for insert errors (not for sync=1 requests)
#format: {"timestamp":..., "timestamp_string":..., "error": ..., "table":..., "fields":..., "rows": ...}\n
#remove or leave empty path if not needed
[insert_error_logger]
path = "error.log"
pretty_print = true

[receivers]

[receivers.first-http]
Expand Down
7 changes: 4 additions & 3 deletions cmd/dbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
)

type config struct {
Receivers map[string]receiver.Config `toml:"receivers"`
Inserters map[string]inserter.Config `toml:"inserters"`
PprofHttpBind string `toml:"pprof_http_bind"`
Receivers map[string]receiver.Config `toml:"receivers"`
Inserters map[string]inserter.Config `toml:"inserters"`
PprofHttpBind string `toml:"pprof_http_bind"`
InsertErrorLogger inserter.InsertErrorLoggerConfig `toml:"insert_error_logger"`
}
5 changes: 5 additions & 0 deletions cmd/dbatcher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@ func TestConfig(t *testing.T) {
},
},
PprofHttpBind: "localhost:6034",
InsertErrorLogger: inserter.InsertErrorLoggerConfig{
Path: "error.log",
PrettyPrint: true,
},
}

if !reflect.DeepEqual(resultingConfig, expectedConfig) {
t.Errorf("got %#v\nwant %#v", resultingConfig, expectedConfig)
t.Fail()
}
}
7 changes: 6 additions & 1 deletion cmd/dbatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@ func main() {
go http.ListenAndServe(c.PprofHttpBind, nil)
}

insertErrorLogger, err := inserter.NewInsertErrorLoggerFromConfig(c.InsertErrorLogger)
if err != nil {
log.Fatalf("can't open file for insert error logger: %s", err)
}
defer insertErrorLogger.Close()
inserters := makeInserters(c)
errChan := make(chan error)
tableManagerHolder := tablemanager.NewHolder(errChan, inserters)
tableManagerHolder := tablemanager.NewHolder(errChan, inserters, insertErrorLogger)
tableManagerHolder.StopUnusedManagers()
receivers := makeAndStartReceivers(c, errChan, tableManagerHolder)

Expand Down
87 changes: 87 additions & 0 deletions internal/inserter/insert_error_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package inserter

import (
"io"
"os"
"sync"
"time"

"github.com/edwvee/dbatcher/internal/table"
jsoniter "github.com/json-iterator/go"
)

type InsertErrorLogger struct {
w io.Writer
prettyPrint bool
mut sync.Mutex
}

type insertErrorLoggerData struct {
TimeStamp int64 `json:"timestamp"`
TimeStampString string `json:"timestamp_string"`
Error string `json:"error"`
Table string `json:"table"`
Fields string `json:"string"`
Rows [][]interface{} `json:"rows"`
}

func NewInsertErrorLoggerFromConfig(config InsertErrorLoggerConfig) (*InsertErrorLogger, error) {
if config.Path == "" {
return NewInsertErrorLogger(nil, false), nil
}
f, err := os.OpenFile(config.Path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
return NewInsertErrorLogger(f, config.PrettyPrint), nil
}

func NewInsertErrorLogger(w io.Writer, prettyPrint bool) *InsertErrorLogger {
return &InsertErrorLogger{w: w, prettyPrint: prettyPrint}
}

func (l *InsertErrorLogger) Log(insertError error, t *table.Table) error {
if l.w == nil {
return nil
}

data := l.MakeData(insertError, t)

l.mut.Lock()
defer l.mut.Unlock()
encoder := jsoniter.NewEncoder(l.w)
if l.prettyPrint {
encoder.SetIndent("", " ")
}
err := encoder.Encode(data)
if err != nil {
return err
}

return err
}

func (l *InsertErrorLogger) MakeData(insertError error, t *table.Table) insertErrorLoggerData {
now := time.Now()
data := insertErrorLoggerData{
TimeStamp: now.Unix(),
TimeStampString: now.String(),
Error: insertError.Error(),
Table: t.GetTableName(),
Fields: t.GetFields(),
Rows: make([][]interface{}, 0, t.GetRowsLen()),
}
for row := t.GetNextRow(); row != nil; row = t.GetNextRow() {
data.Rows = append(data.Rows, row)
}

return data
}

func (l *InsertErrorLogger) Close() error {
if c, ok := l.w.(io.Closer); ok {
return c.Close()
}

return nil
}
6 changes: 6 additions & 0 deletions internal/inserter/insert_error_logger_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package inserter

type InsertErrorLoggerConfig struct {
Path string `toml:"path"`
PrettyPrint bool `toml:"pretty_print"`
}
65 changes: 65 additions & 0 deletions internal/inserter/insert_error_logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package inserter

import (
"bytes"
"os"
"testing"

"github.com/edwvee/dbatcher/internal/table"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
)

func TestNewInsertErrorLoggerFromConfig(t *testing.T) {
config := InsertErrorLoggerConfig{"test_path.log", true}
logger, err := NewInsertErrorLoggerFromConfig(config)
if err != nil {
t.Fatal(err)
}
err = logger.Close()
if err != nil {
t.Fatal(err)
}
err = os.Remove(config.Path)
if err != nil {
t.Fatal(err)
}
}

func TestInsertErrorLoggerLog(t *testing.T) {
buf := &bytes.Buffer{}
logger := NewInsertErrorLogger(buf, false)
tableName := "database.table"
fields := "field1,field2,field3"
table := table.NewTable(table.NewSignature(tableName, fields))
data := []byte("[[\"test\",0,2.4],[\"test_test\",17,4.4]]")
table.AppendRows(data)
errorMessage := "test error"
err := logger.Log(errors.New(errorMessage), table)
if err != nil {
t.Fatal(err)
}

var result insertErrorLoggerData
resultData := buf.Bytes()
err = jsoniter.Unmarshal(resultData, &result)
if err != nil {
t.Fatal(err)
}
if result.Error != errorMessage {
t.Errorf("wrong error message: got %s, want %s", result.Error, errorMessage)
}
if result.Table != tableName {
t.Errorf("wrong table name: got %s, want %s", result.Table, tableName)
}
if result.Fields != fields {
t.Errorf("wrong table fields: got %s, want %s", result.Fields, fields)
}
marshalledRows, err := jsoniter.Marshal(result.Rows)
if err != nil {
t.Fatal(err)
}
if string(marshalledRows) != string(data) {
t.Errorf("wrong rows after json marshalling: got %s, want %s", marshalledRows, data)
}
}
9 changes: 6 additions & 3 deletions internal/receiver/htp_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func TestReceive(t *testing.T) {
inserters := map[string]inserter.Inserter{
"dummy": &inserter.DummyInserter{},
}
tmh := tablemanager.NewHolder(errChan, inserters)
logger := inserter.NewInsertErrorLogger(nil, false)
tmh := tablemanager.NewHolder(errChan, inserters, logger)
if err := rec.Init(defaultHTTPReceiverConfig, errChan, tmh); err != nil {
t.Errorf("shouldn't return error: %s", err.Error())
}
Expand Down Expand Up @@ -106,7 +107,8 @@ func TestHTTPReceiverByRequests(t *testing.T) {
inserters := map[string]inserter.Inserter{
"first": ins,
}
tmh := tablemanager.NewHolder(errChan, inserters)
logger := inserter.NewInsertErrorLogger(nil, false)
tmh := tablemanager.NewHolder(errChan, inserters, logger)
if err := rec.Init(defaultHTTPReceiverConfig, errChan, tmh); err != nil {
t.Errorf("shouldn't return error: %s", err.Error())
}
Expand Down Expand Up @@ -207,7 +209,8 @@ func TestHTTPReceiverByRequests(t *testing.T) {

func TestShutdown(t *testing.T) {
errChan := make(chan error)
tH := tablemanager.NewHolder(errChan, nil)
logger := inserter.NewInsertErrorLogger(nil, false)
tH := tablemanager.NewHolder(errChan, nil, logger)
rec := &HTTPReceiver{}
err := rec.Init(defaultHTTPReceiverConfig, errChan, tH)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,7 @@ func (t *Table) GetNextRow() []interface{} {
func (t *Table) GetRawData() []interface{} {
return t.data
}

func (t *Table) Reset() {
t.dataPos = 0
}
34 changes: 21 additions & 13 deletions internal/tablemanager/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,25 @@ type TableManager struct {
tableMut sync.Mutex
rowsJsons []byte

maxRows int64
inserters map[string]inserter.Inserter
timeoutMs int64
sendChannel chan struct{}
stopChannel chan struct{}
maxRows int64
inserters map[string]inserter.Inserter
insertErrorLogger *inserter.InsertErrorLogger
timeoutMs int64
sendChannel chan struct{}
stopChannel chan struct{}
}

//NewTableManager returns configured table manager
func NewTableManager(ts *table.Signature, config Config, inserters map[string]inserter.Inserter) *TableManager {
func NewTableManager(ts *table.Signature, config Config, inserters map[string]inserter.Inserter, insertErrorLogger *inserter.InsertErrorLogger) *TableManager {
return &TableManager{
table: table.NewTable(*ts),
rowsJsons: []byte{},
inserters: inserters,
maxRows: int64(config.MaxRows),
timeoutMs: config.TimeoutMs,
sendChannel: make(chan struct{}, 1),
stopChannel: make(chan struct{}),
table: table.NewTable(*ts),
rowsJsons: []byte{},
inserters: inserters,
insertErrorLogger: insertErrorLogger,
maxRows: int64(config.MaxRows),
timeoutMs: config.TimeoutMs,
sendChannel: make(chan struct{}, 1),
stopChannel: make(chan struct{}),
}
}

Expand Down Expand Up @@ -119,6 +121,12 @@ func (tm *TableManager) DoInsert() (err error) {
} else {
err = tm.insertConcurrently()
}
if err != nil {
tbl.Reset()
if logErr := tm.insertErrorLogger.Log(err, tbl); logErr != nil {
log.Printf("failed to write error log: %s", logErr)
}
}
tbl.Free()

return
Expand Down
22 changes: 12 additions & 10 deletions internal/tablemanager/table_manager_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ var ErrTableManagerDidntStopInTime = errors.New("didn't stop in time")
//stops them when they are not used for a long time.
//Serves as frontend to table managers.
type Holder struct {
inserters map[string]inserter.Inserter
managers map[string]*TableManager
lastManagerVisit map[string]time.Time
managersMut sync.Mutex
inserters map[string]inserter.Inserter
managers map[string]*TableManager
lastManagerVisit map[string]time.Time
managersMut sync.Mutex
insertErrorLogger *inserter.InsertErrorLogger
}

//NewHolder creates new holder
func NewHolder(errChan chan error, inserters map[string]inserter.Inserter) *Holder {
func NewHolder(errChan chan error, inserters map[string]inserter.Inserter, insertErrorLogger *inserter.InsertErrorLogger) *Holder {
return &Holder{
inserters: inserters,
managers: map[string]*TableManager{},
lastManagerVisit: map[string]time.Time{},
inserters: inserters,
managers: map[string]*TableManager{},
lastManagerVisit: map[string]time.Time{},
insertErrorLogger: insertErrorLogger,
}
}

Expand All @@ -47,7 +49,7 @@ func (h *Holder) Append(ts *table.Signature, config Config, sync bool, rowsJSON
}

//not optimized due sync is debug feature
manager := NewTableManager(ts, config, h.inserters)
manager := NewTableManager(ts, config, h.inserters, h.insertErrorLogger)
if err := manager.AppendRowsToTable(rowsJSON); err != nil {
return err
}
Expand All @@ -61,7 +63,7 @@ func (h *Holder) getTableManager(ts *table.Signature, config Config) *TableManag
manager, ok := h.managers[key]
if !ok {
log.Printf("new table: %s", key)
manager = NewTableManager(ts, config, h.inserters)
manager = NewTableManager(ts, config, h.inserters, h.insertErrorLogger)
go manager.Run()
h.managers[key] = manager
}
Expand Down
Loading

0 comments on commit eff78c4

Please sign in to comment.