diff --git a/network/crypto.go b/network/crypto.go index c98523c..c37e662 100644 --- a/network/crypto.go +++ b/network/crypto.go @@ -13,7 +13,6 @@ import ( "errors" "fmt" "io" - "log" "os" "strings" "sync" @@ -32,8 +31,8 @@ type PasswordLookup interface { // The file has a very simple syntax with one username / password mapping per // line, separated by a colon. For example: // -// alice: w0nderl4nd -// bob: bu1|der +// alice: w0nderl4nd +// bob: bu1|der type AuthFile struct { name string last time.Time @@ -178,7 +177,6 @@ func createCipher(password string, iv []byte) (cipher.Stream, error) { func encryptAES256(plaintext []byte, username, password string) ([]byte, error) { iv := make([]byte, 16) if _, err := rand.Read(iv); err != nil { - log.Printf("rand.Read: %v", err) return nil, err } diff --git a/network/fuzz.go b/network/fuzz.go index 8dbc225..462ee14 100644 --- a/network/fuzz.go +++ b/network/fuzz.go @@ -1,3 +1,4 @@ +//go:build gofuzz // +build gofuzz package network // import "collectd.org/network" diff --git a/network/fuzz_test.go b/network/fuzz_test.go index 367df29..2af97d5 100644 --- a/network/fuzz_test.go +++ b/network/fuzz_test.go @@ -1,3 +1,4 @@ +//go:build gofuzz // +build gofuzz package network // import "collectd.org/network" diff --git a/network/main.go b/network/main.go index 7cc76f4..0aeda13 100644 --- a/network/main.go +++ b/network/main.go @@ -13,9 +13,16 @@ const ( // DefaultBufferSize is the default size of "Buffer". This is based on the // maximum bytes that fit into an Ethernet frame without fragmentation: -// - ( + ) = 1500 - (40 + 8) = 1452 +// +// - ( + ) = 1500 - (40 + 8) = 1452 const DefaultBufferSize = 1452 +// DefaultDispatcherBufferSize is the default depth on the dispatcher channel which +// asynchronously reads value sets from the main server Listener loop and writes them into the api.Writer +// This allows high throughput and asynchronous writes without creating the situation where an api.Writer +// that blocks can cause an infinite buildup go in flight messages and eventual OOM. +const DefaultDispatcherBufferSize = 1024 + // Numeric data source type identifiers. const ( dsTypeCounter = 0 diff --git a/network/network_x_test.go b/network/network_x_test.go index d6f4d41..2ce21f7 100644 --- a/network/network_x_test.go +++ b/network/network_x_test.go @@ -69,7 +69,8 @@ func TestNetwork(t *testing.T) { Password: password, }) if err != nil { - t.Fatal(err) + t.Error(err) + return } vl := &api.ValueList{ diff --git a/network/parse.go b/network/parse.go index 0779c58..6669eab 100644 --- a/network/parse.go +++ b/network/parse.go @@ -33,7 +33,7 @@ type ParseOpts struct { // a parse error is encountered, all ValueLists parsed to this point are // returned as well as the error. Unknown "parts" are silently ignored. func Parse(b []byte, opts ParseOpts) ([]*api.ValueList, error) { - return parse(b, None, opts) + return parse(b, None, opts, log.Default()) } func readUint16(buf *bytes.Buffer) (uint16, error) { @@ -44,7 +44,7 @@ func readUint16(buf *bytes.Buffer) (uint16, error) { return binary.BigEndian.Uint16(read), nil } -func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error) { +func parse(b []byte, sl SecurityLevel, opts ParseOpts, lgr *log.Logger) ([]*api.ValueList, error) { var valueLists []*api.ValueList var state api.ValueList @@ -96,7 +96,7 @@ func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error) if opts.TypesDB != nil { ds, ok := opts.TypesDB.DataSet(state.Type) if !ok { - log.Printf("unable to find %q in TypesDB", state.Type) + lgr.Printf("unable to find %q in TypesDB", state.Type) continue } @@ -110,7 +110,7 @@ func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error) // Returns an error if the number of values is incorrect. v, err := ds.Values(ifValues...) if err != nil { - log.Printf("unable to convert metric %q, values %v according to %v in TypesDB: %v", state, ifValues, ds, err) + lgr.Printf("unable to convert metric %q, values %v according to %v in TypesDB: %v", state, ifValues, ds, err) continue } vl.Values = v @@ -243,7 +243,7 @@ func parseSignSHA256(pkg, payload []byte, opts ParseOpts) ([]*api.ValueList, err return nil, errors.New("SHA256 verification failure") } - return parse(payload, Sign, opts) + return parse(payload, Sign, opts, log.Default()) } func parseEncryptAES256(payload []byte, opts ParseOpts) ([]*api.ValueList, error) { @@ -252,7 +252,7 @@ func parseEncryptAES256(payload []byte, opts ParseOpts) ([]*api.ValueList, error return nil, errors.New("AES256 decryption failure") } - return parse(plaintext, Encrypt, opts) + return parse(plaintext, Encrypt, opts, log.Default()) } func parseInt(b []byte) (uint64, error) { diff --git a/network/server.go b/network/server.go index 5bded61..bf4dff7 100644 --- a/network/server.go +++ b/network/server.go @@ -39,6 +39,15 @@ type Server struct { // Interface is the name of the interface to use when subscribing to a // multicast group. Has no effect when using unicast. Interface string + + // Channel buffer on dispatcher, this limits how many packets/value lists + // can be held in flight before we block on reading new value lists from the network. + // Defaults to 1024 + DispatchBufferSize uint + + // Logger defines a log.Logger that can optionally be provided for handling log messages + // if none is provided a log.Default() is assigned + Logger *log.Logger } // ListenAndWrite listens on the provided UDP connection (or creates one using @@ -78,6 +87,13 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error { if srv.BufferSize <= 0 { srv.BufferSize = DefaultBufferSize } + if srv.DispatchBufferSize <= 0 { + srv.BufferSize = DefaultDispatcherBufferSize + } + + if srv.Logger == nil { + srv.Logger = log.Default() + } popts := ParseOpts{ PasswordLookup: srv.PasswordLookup, @@ -94,11 +110,17 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error { }() var wg sync.WaitGroup + + valueListChan := make(chan []*api.ValueList, srv.DispatchBufferSize) + wg.Add(1) + go srv.dispatcher(ctx, &wg, valueListChan) + for { buf := make([]byte, srv.BufferSize) n, err := srv.Conn.Read(buf) if err != nil { srv.Conn.Close() + close(valueListChan) wg.Wait() if ctx.Err() != nil { return ctx.Err() @@ -108,22 +130,26 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error { valueLists, err := Parse(buf[:n], popts) if err != nil { - log.Printf("error while parsing: %v", err) + srv.Logger.Printf("error while parsing: %v", err) continue } - - wg.Add(1) - go func() { - defer wg.Done() - dispatch(ctx, valueLists, srv.Writer) - }() + select { + case <-ctx.Done(): + //if the context closed, just continue, we will clean up on the next loop iteration + //when the srv.Conn.Read fails + case valueListChan <- valueLists: + //ALL good, we wrote to the channel + } } } -func dispatch(ctx context.Context, valueLists []*api.ValueList, d api.Writer) { - for _, vl := range valueLists { - if err := d.Write(ctx, vl); err != nil { - log.Printf("error while dispatching: %v", err) +func (srv *Server) dispatcher(ctx context.Context, wg *sync.WaitGroup, valueListChan chan []*api.ValueList) { + defer wg.Done() + for vl := range valueListChan { + for _, v := range vl { + if err := srv.Writer.Write(ctx, v); err != nil { + srv.Logger.Printf("error while dispatching: %v", err) + } } } }