Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update network function to use a single dispatcher and buffered channel #97

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions network/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
Expand All @@ -32,8 +31,8 @@ type PasswordLookup interface {
// The file has a very simple syntax with one username / password mapping per
// line, separated by a colon. For example:
//
// alice: w0nderl4nd
// bob: bu1|der
// alice: w0nderl4nd
// bob: bu1|der
type AuthFile struct {
name string
last time.Time
Expand Down Expand Up @@ -178,7 +177,6 @@ func createCipher(password string, iv []byte) (cipher.Stream, error) {
func encryptAES256(plaintext []byte, username, password string) ([]byte, error) {
iv := make([]byte, 16)
if _, err := rand.Read(iv); err != nil {
log.Printf("rand.Read: %v", err)
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions network/fuzz.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build gofuzz
// +build gofuzz

package network // import "collectd.org/network"
Expand Down
1 change: 1 addition & 0 deletions network/fuzz_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build gofuzz
// +build gofuzz

package network // import "collectd.org/network"
Expand Down
9 changes: 8 additions & 1 deletion network/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@ const (

// DefaultBufferSize is the default size of "Buffer". This is based on the
// maximum bytes that fit into an Ethernet frame without fragmentation:
// <Ethernet frame> - (<IPv6 header> + <UDP header>) = 1500 - (40 + 8) = 1452
//
// <Ethernet frame> - (<IPv6 header> + <UDP header>) = 1500 - (40 + 8) = 1452
const DefaultBufferSize = 1452

// DefaultDispatcherBufferSize is the default depth on the dispatcher channel which
// asynchronously reads value sets from the main server Listener loop and writes them into the api.Writer
// This allows high throughput and asynchronous writes without creating the situation where an api.Writer
// that blocks can cause an infinite buildup go in flight messages and eventual OOM.
const DefaultDispatcherBufferSize = 1024

// Numeric data source type identifiers.
const (
dsTypeCounter = 0
Expand Down
3 changes: 2 additions & 1 deletion network/network_x_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func TestNetwork(t *testing.T) {
Password: password,
})
if err != nil {
t.Fatal(err)
t.Error(err)
return
}

vl := &api.ValueList{
Expand Down
12 changes: 6 additions & 6 deletions network/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ParseOpts struct {
// a parse error is encountered, all ValueLists parsed to this point are
// returned as well as the error. Unknown "parts" are silently ignored.
func Parse(b []byte, opts ParseOpts) ([]*api.ValueList, error) {
return parse(b, None, opts)
return parse(b, None, opts, log.Default())
}

func readUint16(buf *bytes.Buffer) (uint16, error) {
Expand All @@ -44,7 +44,7 @@ func readUint16(buf *bytes.Buffer) (uint16, error) {
return binary.BigEndian.Uint16(read), nil
}

func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error) {
func parse(b []byte, sl SecurityLevel, opts ParseOpts, lgr *log.Logger) ([]*api.ValueList, error) {
var valueLists []*api.ValueList

var state api.ValueList
Expand Down Expand Up @@ -96,7 +96,7 @@ func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error)
if opts.TypesDB != nil {
ds, ok := opts.TypesDB.DataSet(state.Type)
if !ok {
log.Printf("unable to find %q in TypesDB", state.Type)
lgr.Printf("unable to find %q in TypesDB", state.Type)
continue
}

Expand All @@ -110,7 +110,7 @@ func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error)
// Returns an error if the number of values is incorrect.
v, err := ds.Values(ifValues...)
if err != nil {
log.Printf("unable to convert metric %q, values %v according to %v in TypesDB: %v", state, ifValues, ds, err)
lgr.Printf("unable to convert metric %q, values %v according to %v in TypesDB: %v", state, ifValues, ds, err)
continue
}
vl.Values = v
Expand Down Expand Up @@ -243,7 +243,7 @@ func parseSignSHA256(pkg, payload []byte, opts ParseOpts) ([]*api.ValueList, err
return nil, errors.New("SHA256 verification failure")
}

return parse(payload, Sign, opts)
return parse(payload, Sign, opts, log.Default())
}

func parseEncryptAES256(payload []byte, opts ParseOpts) ([]*api.ValueList, error) {
Expand All @@ -252,7 +252,7 @@ func parseEncryptAES256(payload []byte, opts ParseOpts) ([]*api.ValueList, error
return nil, errors.New("AES256 decryption failure")
}

return parse(plaintext, Encrypt, opts)
return parse(plaintext, Encrypt, opts, log.Default())
}

func parseInt(b []byte) (uint64, error) {
Expand Down
48 changes: 37 additions & 11 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ type Server struct {
// Interface is the name of the interface to use when subscribing to a
// multicast group. Has no effect when using unicast.
Interface string

// Channel buffer on dispatcher, this limits how many packets/value lists
// can be held in flight before we block on reading new value lists from the network.
// Defaults to 1024
DispatchBufferSize uint

// Logger defines a log.Logger that can optionally be provided for handling log messages
// if none is provided a log.Default() is assigned
Logger *log.Logger
}

// ListenAndWrite listens on the provided UDP connection (or creates one using
Expand Down Expand Up @@ -78,6 +87,13 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error {
if srv.BufferSize <= 0 {
srv.BufferSize = DefaultBufferSize
}
if srv.DispatchBufferSize <= 0 {
srv.BufferSize = DefaultDispatcherBufferSize
}

if srv.Logger == nil {
srv.Logger = log.Default()
}

popts := ParseOpts{
PasswordLookup: srv.PasswordLookup,
Expand All @@ -94,11 +110,17 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error {
}()

var wg sync.WaitGroup

valueListChan := make(chan []*api.ValueList, srv.DispatchBufferSize)
wg.Add(1)
go srv.dispatcher(ctx, &wg, valueListChan)

for {
buf := make([]byte, srv.BufferSize)
n, err := srv.Conn.Read(buf)
if err != nil {
srv.Conn.Close()
close(valueListChan)
wg.Wait()
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -108,22 +130,26 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error {

valueLists, err := Parse(buf[:n], popts)
if err != nil {
log.Printf("error while parsing: %v", err)
srv.Logger.Printf("error while parsing: %v", err)
continue
}

wg.Add(1)
go func() {
defer wg.Done()
dispatch(ctx, valueLists, srv.Writer)
}()
select {
case <-ctx.Done():
//if the context closed, just continue, we will clean up on the next loop iteration
//when the srv.Conn.Read fails
case valueListChan <- valueLists:
//ALL good, we wrote to the channel
}
}
}

func dispatch(ctx context.Context, valueLists []*api.ValueList, d api.Writer) {
for _, vl := range valueLists {
if err := d.Write(ctx, vl); err != nil {
log.Printf("error while dispatching: %v", err)
func (srv *Server) dispatcher(ctx context.Context, wg *sync.WaitGroup, valueListChan chan []*api.ValueList) {
defer wg.Done()
for vl := range valueListChan {
for _, v := range vl {
if err := srv.Writer.Write(ctx, v); err != nil {
srv.Logger.Printf("error while dispatching: %v", err)
}
}
}
}