diff --git a/README.md b/README.md index 6548f66..1ad5162 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,13 @@ A server for batching single inserts to databases. Gather many single or just sm #remove if you won't profile pprof_http_bind = "localhost:6034" +#log for insert errors (not for sync=1 requests) +#format: {"timestamp":..., "timestamp_string":..., "error": ..., "table":..., "fields":..., "rows": ...}\n +#remove or leave empty path if not needed +[insert_error_logger] + path = "error.log" + pretty_print = true + [receivers] [receivers.first-http] @@ -39,6 +46,7 @@ pprof_http_bind = "localhost:6034" #use this type for clickhouse type = "clickhouse" #connection string (look here https://github.com/ClickHouse/clickhouse-go#dsn) + #use native tcp interface, not http or mysql dsn = "tcp://localhost:9000?user=default" #maximum simultaneous connections (treat like maximum simultaneous queries) max_connections = 2 diff --git a/assets/config_example.toml b/assets/config_example.toml index d8a7df3..d5b6b80 100644 --- a/assets/config_example.toml +++ b/assets/config_example.toml @@ -4,6 +4,13 @@ #remove if you won't profile pprof_http_bind = "localhost:6034" +#log for insert errors (not for sync=1 requests) +#format: {"timestamp":..., "timestamp_string":..., "error": ..., "table":..., "fields":..., "rows": ...}\n +#remove or leave empty path if not needed +[insert_error_logger] + path = "error.log" + pretty_print = true + [receivers] [receivers.first-http] diff --git a/cmd/dbatcher/config.go b/cmd/dbatcher/config.go index be6ed7b..397ebf2 100644 --- a/cmd/dbatcher/config.go +++ b/cmd/dbatcher/config.go @@ -6,7 +6,8 @@ import ( ) type config struct { - Receivers map[string]receiver.Config `toml:"receivers"` - Inserters map[string]inserter.Config `toml:"inserters"` - PprofHttpBind string `toml:"pprof_http_bind"` + Receivers map[string]receiver.Config `toml:"receivers"` + Inserters map[string]inserter.Config `toml:"inserters"` + PprofHttpBind string `toml:"pprof_http_bind"` + InsertErrorLogger inserter.InsertErrorLoggerConfig `toml:"insert_error_logger"` } diff --git a/cmd/dbatcher/config_test.go b/cmd/dbatcher/config_test.go index b8bdc95..5abcdb2 100644 --- a/cmd/dbatcher/config_test.go +++ b/cmd/dbatcher/config_test.go @@ -36,9 +36,14 @@ func TestConfig(t *testing.T) { }, }, PprofHttpBind: "localhost:6034", + InsertErrorLogger: inserter.InsertErrorLoggerConfig{ + Path: "error.log", + PrettyPrint: true, + }, } if !reflect.DeepEqual(resultingConfig, expectedConfig) { + t.Errorf("got %#v\nwant %#v", resultingConfig, expectedConfig) t.Fail() } } diff --git a/cmd/dbatcher/main.go b/cmd/dbatcher/main.go index 5ed4606..7b71b9e 100644 --- a/cmd/dbatcher/main.go +++ b/cmd/dbatcher/main.go @@ -26,9 +26,14 @@ func main() { go http.ListenAndServe(c.PprofHttpBind, nil) } + insertErrorLogger, err := inserter.NewInsertErrorLoggerFromConfig(c.InsertErrorLogger) + if err != nil { + log.Fatalf("can't open file for insert error logger: %s", err) + } + defer insertErrorLogger.Close() inserters := makeInserters(c) errChan := make(chan error) - tableManagerHolder := tablemanager.NewHolder(errChan, inserters) + tableManagerHolder := tablemanager.NewHolder(errChan, inserters, insertErrorLogger) tableManagerHolder.StopUnusedManagers() receivers := makeAndStartReceivers(c, errChan, tableManagerHolder) diff --git a/internal/inserter/insert_error_logger.go b/internal/inserter/insert_error_logger.go new file mode 100644 index 0000000..14bc8e7 --- /dev/null +++ b/internal/inserter/insert_error_logger.go @@ -0,0 +1,87 @@ +package inserter + +import ( + "io" + "os" + "sync" + "time" + + "github.com/edwvee/dbatcher/internal/table" + jsoniter "github.com/json-iterator/go" +) + +type InsertErrorLogger struct { + w io.Writer + prettyPrint bool + mut sync.Mutex +} + +type insertErrorLoggerData struct { + TimeStamp int64 `json:"timestamp"` + TimeStampString string `json:"timestamp_string"` + Error string `json:"error"` + Table string `json:"table"` + Fields string `json:"string"` + Rows [][]interface{} `json:"rows"` +} + +func NewInsertErrorLoggerFromConfig(config InsertErrorLoggerConfig) (*InsertErrorLogger, error) { + if config.Path == "" { + return NewInsertErrorLogger(nil, false), nil + } + f, err := os.OpenFile(config.Path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + return NewInsertErrorLogger(f, config.PrettyPrint), nil +} + +func NewInsertErrorLogger(w io.Writer, prettyPrint bool) *InsertErrorLogger { + return &InsertErrorLogger{w: w, prettyPrint: prettyPrint} +} + +func (l *InsertErrorLogger) Log(insertError error, t *table.Table) error { + if l.w == nil { + return nil + } + + data := l.MakeData(insertError, t) + + l.mut.Lock() + defer l.mut.Unlock() + encoder := jsoniter.NewEncoder(l.w) + if l.prettyPrint { + encoder.SetIndent("", " ") + } + err := encoder.Encode(data) + if err != nil { + return err + } + + return err +} + +func (l *InsertErrorLogger) MakeData(insertError error, t *table.Table) insertErrorLoggerData { + now := time.Now() + data := insertErrorLoggerData{ + TimeStamp: now.Unix(), + TimeStampString: now.String(), + Error: insertError.Error(), + Table: t.GetTableName(), + Fields: t.GetFields(), + Rows: make([][]interface{}, 0, t.GetRowsLen()), + } + for row := t.GetNextRow(); row != nil; row = t.GetNextRow() { + data.Rows = append(data.Rows, row) + } + + return data +} + +func (l *InsertErrorLogger) Close() error { + if c, ok := l.w.(io.Closer); ok { + return c.Close() + } + + return nil +} diff --git a/internal/inserter/insert_error_logger_config.go b/internal/inserter/insert_error_logger_config.go new file mode 100644 index 0000000..8d871d0 --- /dev/null +++ b/internal/inserter/insert_error_logger_config.go @@ -0,0 +1,6 @@ +package inserter + +type InsertErrorLoggerConfig struct { + Path string `toml:"path"` + PrettyPrint bool `toml:"pretty_print"` +} diff --git a/internal/inserter/insert_error_logger_test.go b/internal/inserter/insert_error_logger_test.go new file mode 100644 index 0000000..b3ed5f3 --- /dev/null +++ b/internal/inserter/insert_error_logger_test.go @@ -0,0 +1,65 @@ +package inserter + +import ( + "bytes" + "os" + "testing" + + "github.com/edwvee/dbatcher/internal/table" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" +) + +func TestNewInsertErrorLoggerFromConfig(t *testing.T) { + config := InsertErrorLoggerConfig{"test_path.log", true} + logger, err := NewInsertErrorLoggerFromConfig(config) + if err != nil { + t.Fatal(err) + } + err = logger.Close() + if err != nil { + t.Fatal(err) + } + err = os.Remove(config.Path) + if err != nil { + t.Fatal(err) + } +} + +func TestInsertErrorLoggerLog(t *testing.T) { + buf := &bytes.Buffer{} + logger := NewInsertErrorLogger(buf, false) + tableName := "database.table" + fields := "field1,field2,field3" + table := table.NewTable(table.NewSignature(tableName, fields)) + data := []byte("[[\"test\",0,2.4],[\"test_test\",17,4.4]]") + table.AppendRows(data) + errorMessage := "test error" + err := logger.Log(errors.New(errorMessage), table) + if err != nil { + t.Fatal(err) + } + + var result insertErrorLoggerData + resultData := buf.Bytes() + err = jsoniter.Unmarshal(resultData, &result) + if err != nil { + t.Fatal(err) + } + if result.Error != errorMessage { + t.Errorf("wrong error message: got %s, want %s", result.Error, errorMessage) + } + if result.Table != tableName { + t.Errorf("wrong table name: got %s, want %s", result.Table, tableName) + } + if result.Fields != fields { + t.Errorf("wrong table fields: got %s, want %s", result.Fields, fields) + } + marshalledRows, err := jsoniter.Marshal(result.Rows) + if err != nil { + t.Fatal(err) + } + if string(marshalledRows) != string(data) { + t.Errorf("wrong rows after json marshalling: got %s, want %s", marshalledRows, data) + } +} diff --git a/internal/receiver/htp_receiver_test.go b/internal/receiver/htp_receiver_test.go index bb14009..04fe9e7 100644 --- a/internal/receiver/htp_receiver_test.go +++ b/internal/receiver/htp_receiver_test.go @@ -38,7 +38,8 @@ func TestReceive(t *testing.T) { inserters := map[string]inserter.Inserter{ "dummy": &inserter.DummyInserter{}, } - tmh := tablemanager.NewHolder(errChan, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := tablemanager.NewHolder(errChan, inserters, logger) if err := rec.Init(defaultHTTPReceiverConfig, errChan, tmh); err != nil { t.Errorf("shouldn't return error: %s", err.Error()) } @@ -106,7 +107,8 @@ func TestHTTPReceiverByRequests(t *testing.T) { inserters := map[string]inserter.Inserter{ "first": ins, } - tmh := tablemanager.NewHolder(errChan, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := tablemanager.NewHolder(errChan, inserters, logger) if err := rec.Init(defaultHTTPReceiverConfig, errChan, tmh); err != nil { t.Errorf("shouldn't return error: %s", err.Error()) } @@ -207,7 +209,8 @@ func TestHTTPReceiverByRequests(t *testing.T) { func TestShutdown(t *testing.T) { errChan := make(chan error) - tH := tablemanager.NewHolder(errChan, nil) + logger := inserter.NewInsertErrorLogger(nil, false) + tH := tablemanager.NewHolder(errChan, nil, logger) rec := &HTTPReceiver{} err := rec.Init(defaultHTTPReceiverConfig, errChan, tH) if err != nil { diff --git a/internal/table/table.go b/internal/table/table.go index 4899b4a..a6fb957 100644 --- a/internal/table/table.go +++ b/internal/table/table.go @@ -102,3 +102,7 @@ func (t *Table) GetNextRow() []interface{} { func (t *Table) GetRawData() []interface{} { return t.data } + +func (t *Table) Reset() { + t.dataPos = 0 +} diff --git a/internal/tablemanager/table_manager.go b/internal/tablemanager/table_manager.go index e2b8774..e80d064 100644 --- a/internal/tablemanager/table_manager.go +++ b/internal/tablemanager/table_manager.go @@ -19,23 +19,25 @@ type TableManager struct { tableMut sync.Mutex rowsJsons []byte - maxRows int64 - inserters map[string]inserter.Inserter - timeoutMs int64 - sendChannel chan struct{} - stopChannel chan struct{} + maxRows int64 + inserters map[string]inserter.Inserter + insertErrorLogger *inserter.InsertErrorLogger + timeoutMs int64 + sendChannel chan struct{} + stopChannel chan struct{} } //NewTableManager returns configured table manager -func NewTableManager(ts *table.Signature, config Config, inserters map[string]inserter.Inserter) *TableManager { +func NewTableManager(ts *table.Signature, config Config, inserters map[string]inserter.Inserter, insertErrorLogger *inserter.InsertErrorLogger) *TableManager { return &TableManager{ - table: table.NewTable(*ts), - rowsJsons: []byte{}, - inserters: inserters, - maxRows: int64(config.MaxRows), - timeoutMs: config.TimeoutMs, - sendChannel: make(chan struct{}, 1), - stopChannel: make(chan struct{}), + table: table.NewTable(*ts), + rowsJsons: []byte{}, + inserters: inserters, + insertErrorLogger: insertErrorLogger, + maxRows: int64(config.MaxRows), + timeoutMs: config.TimeoutMs, + sendChannel: make(chan struct{}, 1), + stopChannel: make(chan struct{}), } } @@ -119,6 +121,12 @@ func (tm *TableManager) DoInsert() (err error) { } else { err = tm.insertConcurrently() } + if err != nil { + tbl.Reset() + if logErr := tm.insertErrorLogger.Log(err, tbl); logErr != nil { + log.Printf("failed to write error log: %s", logErr) + } + } tbl.Free() return diff --git a/internal/tablemanager/table_manager_holder.go b/internal/tablemanager/table_manager_holder.go index 42d6f2d..8f8dd60 100644 --- a/internal/tablemanager/table_manager_holder.go +++ b/internal/tablemanager/table_manager_holder.go @@ -22,18 +22,20 @@ var ErrTableManagerDidntStopInTime = errors.New("didn't stop in time") //stops them when they are not used for a long time. //Serves as frontend to table managers. type Holder struct { - inserters map[string]inserter.Inserter - managers map[string]*TableManager - lastManagerVisit map[string]time.Time - managersMut sync.Mutex + inserters map[string]inserter.Inserter + managers map[string]*TableManager + lastManagerVisit map[string]time.Time + managersMut sync.Mutex + insertErrorLogger *inserter.InsertErrorLogger } //NewHolder creates new holder -func NewHolder(errChan chan error, inserters map[string]inserter.Inserter) *Holder { +func NewHolder(errChan chan error, inserters map[string]inserter.Inserter, insertErrorLogger *inserter.InsertErrorLogger) *Holder { return &Holder{ - inserters: inserters, - managers: map[string]*TableManager{}, - lastManagerVisit: map[string]time.Time{}, + inserters: inserters, + managers: map[string]*TableManager{}, + lastManagerVisit: map[string]time.Time{}, + insertErrorLogger: insertErrorLogger, } } @@ -47,7 +49,7 @@ func (h *Holder) Append(ts *table.Signature, config Config, sync bool, rowsJSON } //not optimized due sync is debug feature - manager := NewTableManager(ts, config, h.inserters) + manager := NewTableManager(ts, config, h.inserters, h.insertErrorLogger) if err := manager.AppendRowsToTable(rowsJSON); err != nil { return err } @@ -61,7 +63,7 @@ func (h *Holder) getTableManager(ts *table.Signature, config Config) *TableManag manager, ok := h.managers[key] if !ok { log.Printf("new table: %s", key) - manager = NewTableManager(ts, config, h.inserters) + manager = NewTableManager(ts, config, h.inserters, h.insertErrorLogger) go manager.Run() h.managers[key] = manager } diff --git a/internal/tablemanager/table_manager_holder_test.go b/internal/tablemanager/table_manager_holder_test.go index 687f5f7..ccbd856 100644 --- a/internal/tablemanager/table_manager_holder_test.go +++ b/internal/tablemanager/table_manager_holder_test.go @@ -12,7 +12,8 @@ import ( ) func TestNewTableManagerHolder(t *testing.T) { - tmh := NewHolder(defaultTestErrChan, defaultTestInserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := NewHolder(defaultTestErrChan, defaultTestInserters, logger) if !reflect.DeepEqual(tmh.inserters, defaultTestInserters) { t.Errorf("unequal inserters: %v, %v", tmh.inserters, defaultTestInserters) } @@ -32,7 +33,8 @@ func TestNewTableManagerHolder(t *testing.T) { func TestGetTableManager(t *testing.T) { tmc := defaultTestTableManagerConfig - tmh := NewHolder(defaultTestErrChan, defaultTestInserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := NewHolder(defaultTestErrChan, defaultTestInserters, logger) tm := tmh.getTableManager(&defaultTestTableSignature, tmc) if tm == nil { t.Error("got nil table manager") @@ -66,7 +68,8 @@ func TestGetTableManager(t *testing.T) { func TestStopUnusedTableManagers(t *testing.T) { tmc := defaultTestTableManagerConfig - tmh := NewHolder(defaultTestErrChan, defaultTestInserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := NewHolder(defaultTestErrChan, defaultTestInserters, logger) tmh.getTableManager(&defaultTestTableSignature, tmc) if len(tmh.managers) == 0 { t.Errorf("should present table manager in map") @@ -92,7 +95,8 @@ func TestStopTableManagersPositive(t *testing.T) { si := &selfSliceInserter{} si.Init(inserter.Config{}) inserters := map[string]inserter.Inserter{"self slice inserter": si} - tmh := NewHolder(defaultTestErrChan, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := NewHolder(defaultTestErrChan, inserters, logger) tmh.getTableManager(&defaultTestTableSignature, tmc) const managersSize = 10 @@ -118,7 +122,8 @@ func TestStopTableManagersPositive(t *testing.T) { func TestStopTableManagersNegative(t *testing.T) { tmc := defaultTestTableManagerConfig inserters := map[string]inserter.Inserter{"first": &longSleepInserter{}, "second": &longSleepInserter{}} - tmh := NewHolder(defaultTestErrChan, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := NewHolder(defaultTestErrChan, inserters, logger) tmh.getTableManager(&defaultTestTableSignature, tmc) const managersSize = 10 @@ -146,7 +151,8 @@ func TestTableManagerAppendSyncPositive(t *testing.T) { si := &selfSliceInserter{} si.Init(inserter.Config{}) inserters := map[string]inserter.Inserter{"self slice inserter": si} - tmh := NewHolder(defaultTestErrChan, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := NewHolder(defaultTestErrChan, inserters, logger) ts := table.NewSignature("database.`table`", "field1") err := tmh.Append(&ts, tmc, true, []byte("[[1]]")) if err != nil { @@ -163,7 +169,8 @@ func TestTableManagerAppendSyncNegative(t *testing.T) { si := &selfSliceInserter{} si.Init(inserter.Config{}) inserters := map[string]inserter.Inserter{"self slice inserter": si} - tmh := NewHolder(defaultTestErrChan, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := NewHolder(defaultTestErrChan, inserters, logger) ts := table.NewSignature("database.`table`", "field1") err := tmh.Append(&ts, tmc, true, []byte("[[1]56]")) if err == nil { diff --git a/internal/tablemanager/table_manager_test.go b/internal/tablemanager/table_manager_test.go index b42b1bb..1f134b2 100644 --- a/internal/tablemanager/table_manager_test.go +++ b/internal/tablemanager/table_manager_test.go @@ -11,9 +11,10 @@ import ( ) func TestNewTableManager(t *testing.T) { + logger := inserter.NewInsertErrorLogger(nil, false) tm := NewTableManager( &defaultTestTableSignature, defaultTestTableManagerConfig, - defaultTestInserters, + defaultTestInserters, logger, ) tmExpected := &TableManager{ table: table.NewTable(defaultTestTableSignature), @@ -50,7 +51,8 @@ func TestShouldAppendWhenTooMuchRows(t *testing.T) { si := &selfSliceInserter{} si.Init(inserter.Config{}) inserters := map[string]inserter.Inserter{"self slice inserter": si} - tm := NewTableManager(&defaultTestTableSignature, tmc, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tm := NewTableManager(&defaultTestTableSignature, tmc, inserters, logger) go tm.Run() for i := 0; i < maxRows; i++ { err := tm.AppendRowsToTable([]byte("[[1,2,3]]")) @@ -69,7 +71,8 @@ func TestShouldAppendWhenTooMuchRows(t *testing.T) { func TestShouldReturnMultiError(t *testing.T) { tmc := NewConfig(1000, 100, false) inserters := map[string]inserter.Inserter{"1": &errorInserter{}, "2": &errorInserter{}} - tm := NewTableManager(&defaultTestTableSignature, tmc, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tm := NewTableManager(&defaultTestTableSignature, tmc, inserters, logger) err := tm.AppendRowsToTable([]byte("[[1,2,3]]")) if err != nil { t.Fatal(err) diff --git a/pkg/httpclient/client_test.go b/pkg/httpclient/client_test.go index a02dae9..860a038 100644 --- a/pkg/httpclient/client_test.go +++ b/pkg/httpclient/client_test.go @@ -84,7 +84,8 @@ func TestSend(t *testing.T) { ins.Init(inserter.Config{}) inserters := map[string]inserter.Inserter{"first": ins} errChan := make(chan error) - tmh := tablemanager.NewHolder(errChan, inserters) + logger := inserter.NewInsertErrorLogger(nil, false) + tmh := tablemanager.NewHolder(errChan, inserters, logger) rec := &receiver.HTTPReceiver{} rec.Init(receiver.Config{Bind: bind}, errChan, tmh) rec.Receive()