diff --git a/.golangci.yml b/.golangci.yml index 38896d211..ca8d151dd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -18,6 +18,7 @@ linters-settings: enable-all-rules: false rules: - name: superfluous-else + - name: exported errcheck: exclude-functions: - (*github.com/redpanda-data/benthos/v4/internal/batch.Error).Failed diff --git a/internal/api/config.go b/internal/api/config.go index 1de6ef058..c4137b93f 100644 --- a/internal/api/config.go +++ b/internal/api/config.go @@ -28,7 +28,7 @@ type Config struct { BasicAuth httpserver.BasicAuthConfig `json:"basic_auth" yaml:"basic_auth"` } -// NewConfig creates a new API config with default values. +// NewConfig creates an API configuration struct fully populated with default values. func NewConfig() Config { return Config{ Address: "0.0.0.0:4195", @@ -42,6 +42,7 @@ func NewConfig() Config { } } +// FromParsed extracts the Benthos API fields from the config and returns a Benthos API config. func FromParsed(pConf *docs.ParsedConfig) (conf Config, err error) { if conf.Address, err = pConf.FieldString(fieldAddress); err != nil { return diff --git a/internal/api/dynamic_crud.go b/internal/api/dynamic_crud.go index 7fde2e44c..7a3702be1 100644 --- a/internal/api/dynamic_crud.go +++ b/internal/api/dynamic_crud.go @@ -197,6 +197,7 @@ func (d *Dynamic) HandleList(w http.ResponseWriter, r *http.Request) { } } +// HandleUptime is an http.HandleFunc for returning the uptime of the dynamic component. func (d *Dynamic) HandleUptime(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] diff --git a/internal/batch/count.go b/internal/batch/count.go index 2491bbb95..41bcc81b1 100644 --- a/internal/batch/count.go +++ b/internal/batch/count.go @@ -20,13 +20,6 @@ func CtxCollapsedCount(ctx context.Context) int { return 1 } -// CollapsedCount attempts to extract the actual number of messages that were -// collapsed into the resulting message part. This value could be greater than 1 -// when users configure processors that archive batched message parts. -func CollapsedCount(p *message.Part) int { - return CtxCollapsedCount(message.GetContext(p)) -} - // MessageCollapsedCount attempts to extract the actual number of messages that // were combined into the resulting batched message parts. This value could // differ from message.Len() when users configure processors that archive @@ -34,21 +27,12 @@ func CollapsedCount(p *message.Part) int { func MessageCollapsedCount(m message.Batch) int { total := 0 _ = m.Iter(func(i int, p *message.Part) error { - total += CollapsedCount(p) + total += CtxCollapsedCount(message.GetContext(p)) return nil }) return total } -// WithCollapsedCount returns a message part with a context indicating that this -// message is the result of collapsing a number of messages. This allows -// downstream components to know how many total messages were combined. -func WithCollapsedCount(p *message.Part, count int) *message.Part { - // Start with the previous length which could also be >1. - ctx := CtxWithCollapsedCount(message.GetContext(p), count) - return message.WithContext(ctx, p) -} - // CtxWithCollapsedCount returns a message part with a context indicating that this // message is the result of collapsing a number of messages. This allows // downstream components to know how many total messages were combined. diff --git a/internal/batch/count_test.go b/internal/batch/count_test.go index 28278d760..10b347a46 100644 --- a/internal/batch/count_test.go +++ b/internal/batch/count_test.go @@ -9,16 +9,16 @@ import ( ) func TestCount(t *testing.T) { - p1 := message.NewPart([]byte("foo bar")) + p1 := message.GetContext(message.NewPart([]byte("foo bar"))) - p2 := WithCollapsedCount(p1, 2) - p3 := WithCollapsedCount(p2, 3) - p4 := WithCollapsedCount(p1, 4) + p2 := CtxWithCollapsedCount(p1, 2) + p3 := CtxWithCollapsedCount(p2, 3) + p4 := CtxWithCollapsedCount(p1, 4) - assert.Equal(t, 1, CollapsedCount(p1)) - assert.Equal(t, 2, CollapsedCount(p2)) - assert.Equal(t, 4, CollapsedCount(p3)) - assert.Equal(t, 4, CollapsedCount(p4)) + assert.Equal(t, 1, CtxCollapsedCount(p1)) + assert.Equal(t, 2, CtxCollapsedCount(p2)) + assert.Equal(t, 4, CtxCollapsedCount(p3)) + assert.Equal(t, 4, CtxCollapsedCount(p4)) } func TestMessageCount(t *testing.T) { diff --git a/internal/bloblang/mapping/statement.go b/internal/bloblang/mapping/statement.go index 9d121b1ba..277b02fea 100644 --- a/internal/bloblang/mapping/statement.go +++ b/internal/bloblang/mapping/statement.go @@ -7,6 +7,7 @@ import ( "github.com/redpanda-data/benthos/v4/internal/value" ) +// Statement represents a bloblang mapping statement. type Statement interface { QueryTargets(ctx query.TargetsContext) (query.TargetsContext, []query.TargetPath) AssignmentTargets() []TargetPath @@ -16,12 +17,17 @@ type Statement interface { //------------------------------------------------------------------------------ +// SingleStatement describes an isolated mapping statement, where the result of +// a query function is to be mapped according to an Assignment. type SingleStatement struct { input []rune assignment Assignment query query.Function } +// NewSingleStatement initialises a new mapping statement from an Assignment and +// query.Function. The input parameter is an optional slice pointing to the +// parsed expression that created the statement. func NewSingleStatement(input []rune, assignment Assignment, query query.Function) *SingleStatement { return &SingleStatement{ input: input, @@ -30,18 +36,24 @@ func NewSingleStatement(input []rune, assignment Assignment, query query.Functio } } +// QueryTargets returns the query targets for the underlying query. func (s *SingleStatement) QueryTargets(ctx query.TargetsContext) (query.TargetsContext, []query.TargetPath) { return s.query.QueryTargets(ctx) } +// AssignmentTargets returns a representation of what the underlying assignment +// targets. func (s *SingleStatement) AssignmentTargets() []TargetPath { return []TargetPath{s.assignment.Target()} } +// Input returns the underlying parsed expression of this statement. func (s *SingleStatement) Input() []rune { return s.input } +// Execute executes this statement and applies the result onto the assigned +// destination. func (s *SingleStatement) Execute(fnContext query.FunctionContext, asContext AssignmentContext) error { res, err := s.query.Exec(fnContext) if err != nil { @@ -61,22 +73,29 @@ type rootLevelIfStatementPair struct { statements []Statement } +// RootLevelIfStatement describes an isolated conditional mapping statement. type RootLevelIfStatement struct { input []rune pairs []rootLevelIfStatementPair } +// NewRootLevelIfStatement initialises a new conditional mapping statement. The +// input parameter is a slice pointing to the parsed expression that created the +// statement. func NewRootLevelIfStatement(input []rune) *RootLevelIfStatement { return &RootLevelIfStatement{ input: input, } } +// Add adds query statement pairs to the root level if statement. func (r *RootLevelIfStatement) Add(query query.Function, statements ...Statement) *RootLevelIfStatement { r.pairs = append(r.pairs, rootLevelIfStatementPair{query: query, statements: statements}) return r } +// QueryTargets returns the query targets for the underlying conditional mapping +// statement. func (r *RootLevelIfStatement) QueryTargets(ctx query.TargetsContext) (query.TargetsContext, []query.TargetPath) { var paths []query.TargetPath for _, p := range r.pairs { @@ -92,6 +111,8 @@ func (r *RootLevelIfStatement) QueryTargets(ctx query.TargetsContext) (query.Tar return ctx, paths } +// AssignmentTargets returns a representation of what the underlying conditional +// mapping statement targets. func (r *RootLevelIfStatement) AssignmentTargets() []TargetPath { var paths []TargetPath for _, p := range r.pairs { @@ -102,10 +123,14 @@ func (r *RootLevelIfStatement) AssignmentTargets() []TargetPath { return paths } +// Input returns the underlying parsed expression of this conditional mapping +// statement. func (r *RootLevelIfStatement) Input() []rune { return r.input } +// Execute executes this statement if the underlying condition evaluates to +// true. func (r *RootLevelIfStatement) Execute(fnContext query.FunctionContext, asContext AssignmentContext) error { for i, p := range r.pairs { if p.query != nil { diff --git a/internal/bloblang/parser/combinators.go b/internal/bloblang/parser/combinators.go index 76b91675d..0ed4223c7 100644 --- a/internal/bloblang/parser/combinators.go +++ b/internal/bloblang/parser/combinators.go @@ -58,6 +58,8 @@ func Fail[T any](err *Error, input []rune) Result[T] { } } +// ResultInto creates a new result containing the error and the remaining data +// from the input. func ResultInto[T, L any](from Result[L]) Result[T] { return Result[T]{ Err: from.Err, @@ -299,6 +301,8 @@ var Null = func() Func[any] { } }() +// DiscardedWhitespaceNewlineComments skips over any spaces, tabs and comments +// followed by a mandatory line break. var DiscardedWhitespaceNewlineComments = DiscardAll(OneOf(SpacesAndTabs, NewlineAllowComment)) // Array parses an array literal. diff --git a/internal/bloblang/query/function_ctor.go b/internal/bloblang/query/function_ctor.go index 3de64c0c0..daa253a2e 100644 --- a/internal/bloblang/query/function_ctor.go +++ b/internal/bloblang/query/function_ctor.go @@ -11,16 +11,11 @@ type Function interface { // recreate the function. Annotation() string - // MarshalString returns a string representation of the function that could - // be parsed back into the exact equivalent function. The result will be - // normalized, which means the representation may not match the original - // input from the user. - // MarshalString() string - - // Returns a list of targets that this function attempts (or may attempt) to - // access. A context must be provided that describes the current execution - // context that this function will be executed upon, which is how it is able - // to determine the full path and origin of values that it targets. + // QueryTargets returns a list of targets that this function attempts (or + // may attempt) to access. A context must be provided that describes the + // current execution context that this function will be executed upon, which + // is how it is able to determine the full path and origin of values that it + // targets. // // A new context is returned which should be provided to methods that act // upon this function when querying their own targets. diff --git a/internal/bloblang/query/params.go b/internal/bloblang/query/params.go index 8e29644fa..bef207000 100644 --- a/internal/bloblang/query/params.go +++ b/internal/bloblang/query/params.go @@ -109,9 +109,9 @@ func ParamAny(name, description string) ParamDefinition { } } -// NoDynamic disables any form of dynamic assignment for this parameter. This is -// quite limiting (prevents variables from being used, etc) and so should only -// be used with caution. +// DisableDynamic disables any form of dynamic assignment for this parameter. +// This is quite limiting (prevents variables from being used, etc) and so +// should only be used with caution. func (d ParamDefinition) DisableDynamic() ParamDefinition { d.NoDynamic = true return d diff --git a/internal/cli/common/opts.go b/internal/cli/common/opts.go index 0d02d87fd..e2893dd3d 100644 --- a/internal/cli/common/opts.go +++ b/internal/cli/common/opts.go @@ -12,8 +12,7 @@ import ( "github.com/redpanda-data/benthos/v4/internal/log" ) -type CLIStreamBootstrapFunc func() - +// CLIOpts contains the available CLI configuration options. type CLIOpts struct { Version string DateBuilt string @@ -30,6 +29,7 @@ type CLIOpts struct { OnLoggerInit func(l log.Modular) (log.Modular, error) } +// NewCLIOpts returns a new CLIOpts instance populated with default values. func NewCLIOpts(version, dateBuilt string) *CLIOpts { binaryName := "" if len(os.Args) > 0 { @@ -57,6 +57,7 @@ func NewCLIOpts(version, dateBuilt string) *CLIOpts { } } +// ExecTemplate parses a template and applies the CLI branding information to it. func (c *CLIOpts) ExecTemplate(str string) string { t, err := template.New("cli").Parse(str) if err != nil { diff --git a/internal/cli/studio/tracing/observed.go b/internal/cli/studio/tracing/observed.go index eea1560ac..fd4279b08 100644 --- a/internal/cli/studio/tracing/observed.go +++ b/internal/cli/studio/tracing/observed.go @@ -4,12 +4,7 @@ import ( "github.com/redpanda-data/benthos/v4/internal/bundle/tracing" ) -type ObservedSummary struct { - Input int `json:"input"` - Output int `json:"output"` - ProcessorErrors int `json:"processor_errors"` -} - +// ObservedEvent observed event container. type ObservedEvent struct { Type string `json:"type"` Content string `json:"content"` diff --git a/internal/codec/reader.go b/internal/codec/reader.go index 527de6b31..4e3f45282 100644 --- a/internal/codec/reader.go +++ b/internal/codec/reader.go @@ -24,9 +24,7 @@ import ( "github.com/redpanda-data/benthos/v4/internal/message" ) -// ReaderDocs is a static field documentation for input codecs. -var ReaderDocs = NewReaderDocs("codec") - +// NewReaderDocs returns the field documentation for input codecs. func NewReaderDocs(name string) docs.FieldSpec { return docs.FieldString( name, "The way in which the bytes of a data source should be converted into discrete messages, codecs are useful for specifying how large files or continuous streams of data might be processed in small chunks rather than loading it all in memory. It's possible to consume lines using a custom delimiter with the `delim:x` codec, where x is the character sequence custom delimiter. Codecs can be chained with `/`, for example a gzip compressed CSV file can be consumed with the codec `gzip/csv`.", "lines", "delim:\t", "delim:foobar", "gzip/csv", diff --git a/internal/codec/writer.go b/internal/codec/writer.go index 0844d4a77..753ca45fa 100644 --- a/internal/codec/writer.go +++ b/internal/codec/writer.go @@ -9,8 +9,7 @@ import ( "github.com/redpanda-data/benthos/v4/internal/docs" ) -var WriterDocs = NewWriterDocs("codec") - +// NewWriterDocs returns the field documentation for writer codecs. func NewWriterDocs(name string) docs.FieldSpec { return docs.FieldString( name, "The way in which the bytes of messages should be written out into the output data stream. It's possible to write lines using a custom delimiter with the `delim:x` codec, where x is the character sequence custom delimiter.", "lines", "delim:\t", "delim:foobar", @@ -24,12 +23,16 @@ func NewWriterDocs(name string) docs.FieldSpec { //------------------------------------------------------------------------------ +// SuffixFn is a function which should be called by codec writers to determine +// when a custom suffix must be emitted by the writer codec. type SuffixFn func(data []byte) ([]byte, bool) +// WriterConfig is a general configuration struct that covers all writer codecs. type WriterConfig struct { Append bool } +// GetWriter returns a codec writer. func GetWriter(codec string) (sFn SuffixFn, appendMode bool, err error) { switch codec { case "all-bytes": diff --git a/internal/component/buffer/config.go b/internal/component/buffer/config.go index d2ff147d1..df8e1ba08 100644 --- a/internal/component/buffer/config.go +++ b/internal/component/buffer/config.go @@ -24,6 +24,7 @@ func NewConfig() Config { } } +// FromAny returns a buffer config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/component/cache/config.go b/internal/component/cache/config.go index 1bedc6965..eac0d36e0 100644 --- a/internal/component/cache/config.go +++ b/internal/component/cache/config.go @@ -26,6 +26,7 @@ func NewConfig() Config { } } +// FromAny returns a component cache config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/component/input/config.go b/internal/component/input/config.go index f9d6bc6c1..ee0f5df11 100644 --- a/internal/component/input/config.go +++ b/internal/component/input/config.go @@ -31,6 +31,7 @@ func NewConfig() Config { } } +// FromAny returns an input config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/component/metrics/config.go b/internal/component/metrics/config.go index 66b645ba3..8023c30e2 100644 --- a/internal/component/metrics/config.go +++ b/internal/component/metrics/config.go @@ -25,6 +25,7 @@ func NewConfig() Config { } } +// FromAny returns a metrics config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/component/metrics/local.go b/internal/component/metrics/local.go index 41564405b..66136c7f4 100644 --- a/internal/component/metrics/local.go +++ b/internal/component/metrics/local.go @@ -26,24 +26,27 @@ func (l *LocalStat) Incr(count int64) { atomic.AddInt64(l.Value, count) } -// Decr decrements a metric by an amount. +// Decr decrements a metric by an int64 amount. func (l *LocalStat) Decr(count int64) { atomic.AddInt64(l.Value, -count) } -// Set sets a gauge metric. +// Set sets a gauge metric to an int64 value. func (l *LocalStat) Set(value int64) { atomic.StoreInt64(l.Value, value) } +// IncrFloat64 increments a metric by a float64 amount. func (l *LocalStat) IncrFloat64(count float64) { l.Incr(int64(count)) } +// DecrFloat64 decrements a metric by a float64 amount. func (l *LocalStat) DecrFloat64(count float64) { l.Decr(int64(count)) } +// SetFloat64 sets a metric to a float64 value. func (l *LocalStat) SetFloat64(value float64) { l.Set(int64(value)) } diff --git a/internal/component/output/config.go b/internal/component/output/config.go index f2e660212..cc2b56084 100644 --- a/internal/component/output/config.go +++ b/internal/component/output/config.go @@ -31,6 +31,7 @@ func NewConfig() Config { } } +// FromAny returns an output config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/component/processor/config.go b/internal/component/processor/config.go index 087d32321..e6ea45cfa 100644 --- a/internal/component/processor/config.go +++ b/internal/component/processor/config.go @@ -28,6 +28,7 @@ func NewConfig() Config { } } +// FromAny returns a processor config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/component/ratelimit/config.go b/internal/component/ratelimit/config.go index 4371603a4..8a051218c 100644 --- a/internal/component/ratelimit/config.go +++ b/internal/component/ratelimit/config.go @@ -28,6 +28,7 @@ func NewConfig() Config { } } +// FromAny returns a ratelimit config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/component/scanner/config.go b/internal/component/scanner/config.go index 43d90a404..ae5c8cb61 100644 --- a/internal/component/scanner/config.go +++ b/internal/component/scanner/config.go @@ -8,11 +8,13 @@ import ( "github.com/redpanda-data/benthos/v4/internal/docs" ) +// Config is the all encompassing configuration struct for all scanner types. type Config struct { Type string Plugin any } +// FromAny returns a scanner config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/component/scanner/testutil/testutil.go b/internal/component/scanner/testutil/testutil.go index 780d8661f..e1faaa325 100644 --- a/internal/component/scanner/testutil/testutil.go +++ b/internal/component/scanner/testutil/testutil.go @@ -34,6 +34,7 @@ func (n microReader) Read(p []byte) (int, error) { return byteCount, nil } +// ScannerTestSuite returns the test suite for scanners. func ScannerTestSuite(t *testing.T, codec *service.OwnedScannerCreator, details *service.ScannerSourceDetails, data []byte, expected ...string) { if details == nil { details = &service.ScannerSourceDetails{} diff --git a/internal/component/testutil/from_yaml.go b/internal/component/testutil/from_yaml.go index e91ea29c3..b3a38737a 100644 --- a/internal/component/testutil/from_yaml.go +++ b/internal/component/testutil/from_yaml.go @@ -18,6 +18,8 @@ import ( "github.com/redpanda-data/benthos/v4/internal/stream" ) +// BufferFromYAML attempts to parse a config string and returns a buffer config +// if successful or an error otherwise. func BufferFromYAML(confStr string, args ...any) (buffer.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -26,6 +28,8 @@ func BufferFromYAML(confStr string, args ...any) (buffer.Config, error) { return buffer.FromAny(bundle.GlobalEnvironment, node) } +// CacheFromYAML attempts to parse a config string and returns a cache config +// if successful or an error otherwise. func CacheFromYAML(confStr string, args ...any) (cache.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -34,6 +38,8 @@ func CacheFromYAML(confStr string, args ...any) (cache.Config, error) { return cache.FromAny(bundle.GlobalEnvironment, node) } +// InputFromYAML attempts to parse a config string and returns an input config +// if successful or an error otherwise. func InputFromYAML(confStr string, args ...any) (input.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -42,6 +48,8 @@ func InputFromYAML(confStr string, args ...any) (input.Config, error) { return input.FromAny(bundle.GlobalEnvironment, node) } +// MetricsFromYAML attempts to parse a config string and returns a metrics +// config if successful or an error otherwise. func MetricsFromYAML(confStr string, args ...any) (metrics.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -50,6 +58,8 @@ func MetricsFromYAML(confStr string, args ...any) (metrics.Config, error) { return metrics.FromAny(bundle.GlobalEnvironment, node) } +// OutputFromYAML attempts to parse a config string and returns an output config +// if successful or an error otherwise. func OutputFromYAML(confStr string, args ...any) (output.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -58,6 +68,8 @@ func OutputFromYAML(confStr string, args ...any) (output.Config, error) { return output.FromAny(bundle.GlobalEnvironment, node) } +// ProcessorFromYAML attempts to parse a config string and returns a processor +// config if successful or an error otherwise. func ProcessorFromYAML(confStr string, args ...any) (processor.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -66,6 +78,8 @@ func ProcessorFromYAML(confStr string, args ...any) (processor.Config, error) { return processor.FromAny(bundle.GlobalEnvironment, node) } +// RateLimitFromYAML attempts to parse a config string and returns a ratelimit +// config if successful or an error otherwise. func RateLimitFromYAML(confStr string, args ...any) (ratelimit.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -74,6 +88,8 @@ func RateLimitFromYAML(confStr string, args ...any) (ratelimit.Config, error) { return ratelimit.FromAny(bundle.GlobalEnvironment, node) } +// TracerFromYAML attempts to parse a config string and returns a tracer config +// if successful or an error otherwise. func TracerFromYAML(confStr string, args ...any) (tracer.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -82,6 +98,8 @@ func TracerFromYAML(confStr string, args ...any) (tracer.Config, error) { return tracer.FromAny(bundle.GlobalEnvironment, node) } +// ManagerFromYAML attempts to parse a config string and returns a manager +// config if successful or an error otherwise. func ManagerFromYAML(confStr string, args ...any) (manager.ResourceConfig, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -90,6 +108,8 @@ func ManagerFromYAML(confStr string, args ...any) (manager.ResourceConfig, error return manager.FromAny(bundle.GlobalEnvironment, node) } +// StreamFromYAML attempts to parse a config string and returns a stream config +// if successful or an error otherwise. func StreamFromYAML(confStr string, args ...any) (stream.Config, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { @@ -106,6 +126,8 @@ func StreamFromYAML(confStr string, args ...any) (stream.Config, error) { return stream.FromParsed(bundle.GlobalEnvironment, pConf, rawSource) } +// ConfigFromYAML attempts to parse a config string and returns a Benthos +// service config if successful or an error otherwise. func ConfigFromYAML(confStr string, args ...any) (config.Type, error) { node, err := docs.UnmarshalYAML(fmt.Appendf(nil, confStr, args...)) if err != nil { diff --git a/internal/component/tracer/config.go b/internal/component/tracer/config.go index ddd865734..bedee96c7 100644 --- a/internal/component/tracer/config.go +++ b/internal/component/tracer/config.go @@ -29,6 +29,7 @@ func NewConfig() Config { } } +// FromAny returns a tracer config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/config/schema.go b/internal/config/schema.go index 921062fda..642015451 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -37,6 +37,7 @@ type Type struct { rawSource any } +// GetRawSource returns the Type raw source. func (t *Type) GetRawSource() any { return t.rawSource } @@ -89,6 +90,7 @@ func SpecWithoutStream(spec docs.FieldSpecs) docs.FieldSpecs { return fields } +// FromParsed extracts the Benthos service fields from the parsed config and returns a Benthos service config. func FromParsed(prov docs.Provider, pConf *docs.ParsedConfig, rawSource any) (conf Type, err error) { conf.rawSource = rawSource if conf.Config, err = stream.FromParsed(prov, pConf, nil); err != nil { diff --git a/internal/config/test/case.go b/internal/config/test/case.go index 458a5a3c5..32517a821 100644 --- a/internal/config/test/case.go +++ b/internal/config/test/case.go @@ -15,6 +15,7 @@ const ( fieldCaseOutputBatches = "output_batches" ) +// Case contains a definition of a single Benthos config test case. type Case struct { Name string Environment map[string]string @@ -27,6 +28,7 @@ type Case struct { line int } +// Line returns the case line. func (c *Case) Line() int { return c.line } @@ -70,6 +72,7 @@ It is also possible to target processors in a separate file by prefixing the tar } } +// CaseFromAny attempts to return a Case from the untyped input parameter. func CaseFromAny(v any) (Case, error) { pConf, err := caseFields().ParsedConfigFromAny(v) if err != nil { @@ -78,6 +81,7 @@ func CaseFromAny(v any) (Case, error) { return CaseFromParsed(pConf) } +// CaseFromParsed extracts a Case from a parsed config. func CaseFromParsed(pConf *docs.ParsedConfig) (c Case, err error) { c.line, _ = pConf.Line() if c.Name, err = pConf.FieldString(fieldCaseName); err != nil { diff --git a/internal/config/test/docs.go b/internal/config/test/docs.go index c50c16a5d..2215cbc82 100644 --- a/internal/config/test/docs.go +++ b/internal/config/test/docs.go @@ -15,6 +15,7 @@ func ConfigSpec() docs.FieldSpec { return docs.FieldObject(fieldTests, "A list of one or more unit tests to execute.").Array().WithChildren(caseFields()...).Optional() } +// FromAny returns a Case slice from a yaml node or parsed config. func FromAny(v any) ([]Case, error) { if t, ok := v.(*yaml.Node); ok { var tmp struct { @@ -45,6 +46,7 @@ func FromAny(v any) ([]Case, error) { return FromParsed(pConf) } +// FromParsed extracts a Case slice from a parsed config. func FromParsed(pConf *docs.ParsedConfig) ([]Case, error) { if !pConf.Contains(fieldTests) { return nil, nil diff --git a/internal/config/test/input.go b/internal/config/test/input.go index e09c4d3b2..c7457e89d 100644 --- a/internal/config/test/input.go +++ b/internal/config/test/input.go @@ -31,12 +31,14 @@ func inputFields() docs.FieldSpecs { } } +// InputConfig represents the test input config. type InputConfig struct { Content string Path string Metadata map[string]any } +// ToMessage creates a message from an InputConfig. func (i InputConfig) ToMessage(fs fs.FS, dir string) (*message.Part, error) { msgContent := []byte(i.Content) if i.Path != "" { @@ -55,6 +57,7 @@ func (i InputConfig) ToMessage(fs fs.FS, dir string) (*message.Part, error) { return msg, nil } +// InputFromParsed creates an InputConfig from a parsed config. func InputFromParsed(pConf *docs.ParsedConfig) (conf InputConfig, err error) { if pConf.Contains(fieldInputContent) { if conf.Content, err = pConf.FieldString(fieldInputContent); err != nil { diff --git a/internal/config/test/output.go b/internal/config/test/output.go index 37ecba586..a0a99c76f 100644 --- a/internal/config/test/output.go +++ b/internal/config/test/output.go @@ -68,12 +68,15 @@ func outputFields() docs.FieldSpecs { } } +// OutputCondition contains a test output condition. type OutputCondition interface { Check(fs fs.FS, dir string, part *message.Part) error } +// OutputConditionsMap represents a collection of output conditions. type OutputConditionsMap map[string]OutputCondition +// CheckAll runs all output condition checks. func (c OutputConditionsMap) CheckAll(fs fs.FS, dir string, part *message.Part) (errs []error) { condTypes := []string{} for k := range c { @@ -88,6 +91,7 @@ func (c OutputConditionsMap) CheckAll(fs fs.FS, dir string, part *message.Part) return } +// OutputConditionsFromParsed extracts an OutputConditionsMap from a parsed config. func OutputConditionsFromParsed(pConf *docs.ParsedConfig) (m OutputConditionsMap, err error) { m = OutputConditionsMap{} if pConf.Contains(fieldOutputBloblang) { @@ -183,6 +187,7 @@ func OutputConditionsFromParsed(pConf *docs.ParsedConfig) (m OutputConditionsMap return } +// BloblangCondition represents a test bloblang condition. type BloblangCondition struct { m *mapping.Executor } @@ -195,6 +200,7 @@ func parseBloblangCondition(expr string) (*BloblangCondition, error) { return &BloblangCondition{m}, nil } +// Check runs the bloblang condition check. func (b *BloblangCondition) Check(fs fs.FS, dir string, p *message.Part) error { msg := message.Batch{p} res, err := b.m.QueryPart(0, msg) @@ -207,8 +213,10 @@ func (b *BloblangCondition) Check(fs fs.FS, dir string, p *message.Part) error { return nil } +// ContentEqualsCondition represents a test ContentEquals condition. type ContentEqualsCondition string +// Check runs the ContentEquals condition check. func (c ContentEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) error { if exp, act := string(c), string(p.AsBytes()); exp != act { return fmt.Errorf("content mismatch\n expected: %v\n received: %v", blue(exp), red(act)) @@ -216,8 +224,10 @@ func (c ContentEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) err return nil } +// ContentMatchesCondition represents a test ContentMatches condition. type ContentMatchesCondition string +// Check runs the ContentMatches condition check. func (c ContentMatchesCondition) Check(fs fs.FS, dir string, p *message.Part) error { re := regexp.MustCompile(string(c)) if !re.Match(p.AsBytes()) { @@ -226,8 +236,10 @@ func (c ContentMatchesCondition) Check(fs fs.FS, dir string, p *message.Part) er return nil } +// ContentJSONEqualsCondition represents a test ContentJSONEquals condition. type ContentJSONEqualsCondition string +// Check runs the ContentJSONEquals condition check. func (c ContentJSONEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) error { jdopts := jsondiff.DefaultConsoleOptions() diff, explanation := jsondiff.Compare(p.AsBytes(), []byte(c), &jdopts) @@ -237,8 +249,10 @@ func (c ContentJSONEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) return nil } +// ContentJSONContainsCondition represents a test ContentJSONContains condition. type ContentJSONContainsCondition string +// Check runs the ContentJSONContains condition check. func (c ContentJSONContainsCondition) Check(fs fs.FS, dir string, p *message.Part) error { jdopts := jsondiff.DefaultConsoleOptions() diff, explanation := jsondiff.Compare(p.AsBytes(), []byte(c), &jdopts) @@ -248,8 +262,10 @@ func (c ContentJSONContainsCondition) Check(fs fs.FS, dir string, p *message.Par return nil } +// FileEqualsCondition represents a test FileEquals condition. type FileEqualsCondition string +// Check runs the FileEquals condition check. func (c FileEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) error { relPath := filepath.Join(dir, string(c)) @@ -264,8 +280,10 @@ func (c FileEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) error return nil } +// FileJSONEqualsCondition represents a test FileJSONEquals condition. type FileJSONEqualsCondition string +// Check runs the FileJSONEquals condition check. func (c FileJSONEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) error { relPath := filepath.Join(dir, string(c)) @@ -278,8 +296,10 @@ func (c FileJSONEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) er return comparison.Check(fs, dir, p) } +// FileJSONContainsCondition represents a test FileJSONContains condition. type FileJSONContainsCondition string +// Check runs the FileJSONContains condition check. func (c FileJSONContainsCondition) Check(fs fs.FS, dir string, p *message.Part) error { relPath := filepath.Join(dir, string(c)) @@ -292,8 +312,10 @@ func (c FileJSONContainsCondition) Check(fs fs.FS, dir string, p *message.Part) return comparison.Check(fs, dir, p) } +// MetadataEqualsCondition represents a test MetadataEquals condition. type MetadataEqualsCondition map[string]any +// Check runs the MetadataEquals condition check. func (m MetadataEqualsCondition) Check(fs fs.FS, dir string, p *message.Part) error { for k, exp := range m { act, exists := p.MetaGetMut(k) diff --git a/internal/docs/field.go b/internal/docs/field.go index b826eec46..9df37db72 100644 --- a/internal/docs/field.go +++ b/internal/docs/field.go @@ -409,6 +409,7 @@ root = if !$options.exists(this.string()%v) { return f } +// ScrubValue applies a sensitive information scrubber to a field value. func (f FieldSpec) ScrubValue(v any) (any, error) { if f.Scrubber == "" { return v, nil @@ -431,6 +432,7 @@ func (f FieldSpec) ScrubValue(v any) (any, error) { return res, nil } +// GetLintFunc returns the field linter func. func (f FieldSpec) GetLintFunc() LintFunc { fn := f.customLintFn if fn == nil && f.Linter != "" { diff --git a/internal/docs/parsed.go b/internal/docs/parsed.go index 9b208388f..280ed3ec0 100644 --- a/internal/docs/parsed.go +++ b/internal/docs/parsed.go @@ -11,6 +11,8 @@ import ( "github.com/redpanda-data/benthos/v4/internal/value" ) +// ParsedConfigFromAny returns a plugin component configuration config from a +// parsed config, yaml node or any value. func (f FieldSpec) ParsedConfigFromAny(v any) (pConf *ParsedConfig, err error) { pConf = &ParsedConfig{} switch t := v.(type) { @@ -23,6 +25,8 @@ func (f FieldSpec) ParsedConfigFromAny(v any) (pConf *ParsedConfig, err error) { return } +// ParsedConfigFromAny returns a plugin component configuration config from a +// parsed config, yaml node or any value. func (f FieldSpecs) ParsedConfigFromAny(v any) (pConf *ParsedConfig, err error) { pConf = &ParsedConfig{} switch t := v.(type) { @@ -44,10 +48,12 @@ type ParsedConfig struct { line *int } +// Raw returns the raw config. func (p *ParsedConfig) Raw() any { return p.generic } +// Line returns the config line. func (p *ParsedConfig) Line() (int, bool) { if p.line == nil { return 0, false @@ -75,6 +81,7 @@ func (p *ParsedConfig) Field(path ...string) (any, bool) { return gObj.S(path...).Data(), true } +// FullDotPath returns the canonical path of a config. func (p *ParsedConfig) FullDotPath(path ...string) string { var fullPath []string fullPath = append(fullPath, p.hiddenPath...) diff --git a/internal/httpserver/basic_auth.go b/internal/httpserver/basic_auth.go index 083b7bdb9..e09990f41 100644 --- a/internal/httpserver/basic_auth.go +++ b/internal/httpserver/basic_auth.go @@ -122,6 +122,7 @@ func BasicAuthFieldSpec() docs.FieldSpec { ).Advanced() } +// BasicAuthConfigFromParsed extracts the auth fields from the parsed config and returns a BasicAuth config. func BasicAuthConfigFromParsed(pConf *docs.ParsedConfig) (conf BasicAuthConfig, err error) { pConf = pConf.Namespace(fieldBasicAuth) if conf.Enabled, err = pConf.FieldBool(fieldBasicAuthEnabled); err != nil { diff --git a/internal/httpserver/cors.go b/internal/httpserver/cors.go index 2f8e04a95..d4dcae327 100644 --- a/internal/httpserver/cors.go +++ b/internal/httpserver/cors.go @@ -52,6 +52,7 @@ func ServerCORSFieldSpec() docs.FieldSpec { ).AtVersion("3.63.0").Advanced() } +// CORSConfigFromParsed extracts the CORS fields from the parsed config and returns a CORS config. func CORSConfigFromParsed(pConf *docs.ParsedConfig) (conf CORSConfig, err error) { pConf = pConf.Namespace(fieldCORS) if conf.Enabled, err = pConf.FieldBool(fieldCORSEnabled); err != nil { diff --git a/internal/impl/pure/algorithms.go b/internal/impl/pure/algorithms.go index 6e616e9e3..3e0798440 100644 --- a/internal/impl/pure/algorithms.go +++ b/internal/impl/pure/algorithms.go @@ -17,12 +17,17 @@ import ( ) type ( - CompressFunc func(level int, b []byte) ([]byte, error) - CompressWriter func(level int, w io.Writer) (io.Writer, error) - DecompressFunc func(b []byte) ([]byte, error) + // CompressFunc is a func which compresses a byte slice. + CompressFunc func(level int, b []byte) ([]byte, error) + // CompressWriter is a compressor func which wraps an io.Writer. + CompressWriter func(level int, w io.Writer) (io.Writer, error) + // DecompressFunc is a func which decompresses a byte slice. + DecompressFunc func(b []byte) ([]byte, error) + // DecompressReader is a decompressor func which wraps an io.Reader. DecompressReader func(r io.Reader) (io.Reader, error) ) +// KnownCompressionAlgorithm is a unified interface for various compression algorithms. type KnownCompressionAlgorithm struct { CompressFunc CompressFunc CompressWriter CompressWriter @@ -34,6 +39,7 @@ var knownCompressionAlgorithms = map[string]KnownCompressionAlgorithm{} var knownCompressionAlgorithmsLock sync.Mutex +// AddKnownCompressionAlgorithm registers a compression algorithm. func AddKnownCompressionAlgorithm(name string, a KnownCompressionAlgorithm) struct{} { if a.CompressFunc == nil && a.CompressWriter != nil { a.CompressFunc = func(level int, b []byte) ([]byte, error) { @@ -74,6 +80,7 @@ func AddKnownCompressionAlgorithm(name string, a KnownCompressionAlgorithm) stru return struct{}{} } +// CompressionAlgsList returns the list of registered compression algorithms. func CompressionAlgsList() (v []string) { knownCompressionAlgorithmsLock.Lock() v = make([]string, 0, len(knownCompressionAlgorithms)) @@ -87,6 +94,7 @@ func CompressionAlgsList() (v []string) { return v } +// DecompressionAlgsList returns the list of registered decompression algorithms. func DecompressionAlgsList() (v []string) { knownCompressionAlgorithmsLock.Lock() v = make([]string, 0, len(knownCompressionAlgorithms)) @@ -143,15 +151,17 @@ func strToDecompressReader(str string) (DecompressReader, error) { //------------------------------------------------------------------------------ -// The Primary is written to and closed first. The Sink is closed second. +// CombinedWriteCloser combines a Primary source and a Sink. The Primary is written to and closed first. The Sink is closed second. type CombinedWriteCloser struct { Primary, Sink io.Writer } +// Read writes data to the Primary. func (c *CombinedWriteCloser) Write(b []byte) (int, error) { return c.Primary.Write(b) } +// Close closes the Primary and then the Sink. func (c *CombinedWriteCloser) Close() error { if closer, ok := c.Primary.(io.Closer); ok { if err := closer.Close(); err != nil { @@ -166,15 +176,17 @@ func (c *CombinedWriteCloser) Close() error { return nil } -// The Primary is read from and closed second. The Source is closed first. +// CombinedReadCloser combines a Primary destination and a Source. The Primary is read from and closed second. The Source is closed first. type CombinedReadCloser struct { Primary, Source io.Reader } +// Read reads data from the Primary. func (c *CombinedReadCloser) Read(b []byte) (int, error) { return c.Primary.Read(b) } +// Close closes the Source and then the Primary. func (c *CombinedReadCloser) Close() error { if closer, ok := c.Source.(io.Closer); ok { if err := closer.Close(); err != nil { diff --git a/internal/impl/pure/output_cache.go b/internal/impl/pure/output_cache.go index 7fa697b5a..e6a0a8909 100644 --- a/internal/impl/pure/output_cache.go +++ b/internal/impl/pure/output_cache.go @@ -21,6 +21,7 @@ const ( coFieldTTL = "ttl" ) +// CacheOutputSpec returns the config spec of the cache output plugin. func CacheOutputSpec() *service.ConfigSpec { return service.NewConfigSpec(). Stable(). @@ -95,6 +96,7 @@ func init() { } } +// CacheWriter is a writer implementation for the cache output plugin. type CacheWriter struct { mgr bundle.NewManagement @@ -105,7 +107,7 @@ type CacheWriter struct { log log.Modular } -// NewCacheWriter creates a writer for cache the output plugin. +// NewCacheWriter creates a writer for the cache output plugin. func NewCacheWriter(conf *service.ParsedConfig, mgr bundle.NewManagement) (*CacheWriter, error) { target, err := conf.FieldString(coFieldTarget) if err != nil { diff --git a/internal/impl/pure/processor_workflow.go b/internal/impl/pure/processor_workflow.go index ed09476bd..cc5f34151 100644 --- a/internal/impl/pure/processor_workflow.go +++ b/internal/impl/pure/processor_workflow.go @@ -275,7 +275,7 @@ type Workflow struct { mLatency metrics.StatTimer } -// NewWorkflow instanciates a new workflow processor. +// NewWorkflow instantiates a new workflow processor. func NewWorkflow(conf *service.ParsedConfig, mgr bundle.NewManagement) (*Workflow, error) { stats := mgr.Metrics() w := &Workflow{ diff --git a/internal/log/config.go b/internal/log/config.go index d0ee32c15..96705457e 100644 --- a/internal/log/config.go +++ b/internal/log/config.go @@ -71,6 +71,7 @@ func (conf *Config) UnmarshalYAML(unmarshal func(any) error) error { return nil } +// FromParsed extracts the log fields from the parsed config and returns a log config. func FromParsed(pConf *docs.ParsedConfig) (conf Config, err error) { if conf.LogLevel, err = pConf.FieldString(fieldLogLevel); err != nil { return diff --git a/internal/log/slog.go b/internal/log/slog.go index f97dd8c32..0bfefc2b7 100644 --- a/internal/log/slog.go +++ b/internal/log/slog.go @@ -12,10 +12,12 @@ type logHandler struct { slog *slog.Logger } +// NewBenthosLogAdapter creates a new Benthos log adapter. func NewBenthosLogAdapter(l *slog.Logger) *logHandler { return &logHandler{slog: l} } +// WithFields adds extra fields to the log adapter. func (l *logHandler) WithFields(fields map[string]string) Modular { tmp := l.slog for k, v := range fields { @@ -27,33 +29,41 @@ func (l *logHandler) WithFields(fields map[string]string) Modular { return c } +// With returns a Logger that includes the given attributes. Arguments are +// converted to attributes as if by the standard `Logger.Log()`. func (l *logHandler) With(keyValues ...any) Modular { c := l.clone() c.slog = l.slog.With(keyValues...) return c } +// Fatal logs at error level followed by a call to `os.Exit()`. func (l *logHandler) Fatal(format string, v ...any) { l.slog.Error(fmt.Sprintf(format, v...)) os.Exit(1) } +// Error logs at error level. func (l *logHandler) Error(format string, v ...any) { l.slog.Error(fmt.Sprintf(format, v...)) } +// Warn logs at warning level. func (l *logHandler) Warn(format string, v ...any) { l.slog.Warn(fmt.Sprintf(format, v...)) } +// Info logs at info level. func (l *logHandler) Info(format string, v ...any) { l.slog.Info(fmt.Sprintf(format, v...)) } +// Debug logs at debug level. func (l *logHandler) Debug(format string, v ...any) { l.slog.Debug(fmt.Sprintf(format, v...)) } +// Trace logs at trace level. func (l *logHandler) Trace(format string, v ...any) { l.slog.Debug(fmt.Sprintf(format, v...)) } diff --git a/internal/log/tee.go b/internal/log/tee.go index e70fe5b21..94de17276 100644 --- a/internal/log/tee.go +++ b/internal/log/tee.go @@ -4,10 +4,12 @@ type teeLogger struct { a, b Modular } +// TeeLogger creates a new log adapter that allows you to branch new modules. func TeeLogger(a, b Modular) Modular { return &teeLogger{a: a, b: b} } +// WithFields adds extra fields to the log adapter. func (t *teeLogger) WithFields(fields map[string]string) Modular { return &teeLogger{ a: t.a.WithFields(fields), @@ -15,6 +17,8 @@ func (t *teeLogger) WithFields(fields map[string]string) Modular { } } +// With returns a Logger that includes the given attributes. Arguments are +// converted to attributes as if by the standard `Logger.Log()`. func (t *teeLogger) With(keyValues ...any) Modular { return &teeLogger{ a: t.a.With(keyValues...), @@ -22,31 +26,37 @@ func (t *teeLogger) With(keyValues ...any) Modular { } } +// Fatal logs at error level followed by a call to `os.Exit()`. func (t *teeLogger) Fatal(format string, v ...any) { t.a.Fatal(format, v...) t.b.Fatal(format, v...) } +// Error logs at error level. func (t *teeLogger) Error(format string, v ...any) { t.a.Error(format, v...) t.b.Error(format, v...) } +// Warn logs at warning level. func (t *teeLogger) Warn(format string, v ...any) { t.a.Warn(format, v...) t.b.Warn(format, v...) } +// Info logs at info level. func (t *teeLogger) Info(format string, v ...any) { t.a.Info(format, v...) t.b.Info(format, v...) } +// Debug logs at debug level. func (t *teeLogger) Debug(format string, v ...any) { t.a.Debug(format, v...) t.b.Debug(format, v...) } +// Trace logs at trace level. func (t *teeLogger) Trace(format string, v ...any) { t.a.Trace(format, v...) t.b.Trace(format, v...) diff --git a/internal/manager/config.go b/internal/manager/config.go index 06d50ff82..8cded99ec 100644 --- a/internal/manager/config.go +++ b/internal/manager/config.go @@ -49,6 +49,8 @@ func (r *ResourceConfig) AddFrom(extra *ResourceConfig) error { return nil } +// FromAny returns a resource config from a parsed config, yaml node or any +// value. func FromAny(prov docs.Provider, v any) (conf ResourceConfig, err error) { var pConf *docs.ParsedConfig if pConf, err = Spec().ParsedConfigFromAny(v); err != nil { @@ -57,6 +59,7 @@ func FromAny(prov docs.Provider, v any) (conf ResourceConfig, err error) { return FromParsed(prov, pConf) } +// FromParsed extracts a resource config from a parsed config. func FromParsed(prov docs.Provider, pConf *docs.ParsedConfig) (conf ResourceConfig, err error) { conf = NewResourceConfig() diff --git a/internal/manager/input_wrapper.go b/internal/manager/input_wrapper.go index f3916caf7..843917ec4 100644 --- a/internal/manager/input_wrapper.go +++ b/internal/manager/input_wrapper.go @@ -20,6 +20,7 @@ type inputCtrl struct { closedForSwap *int32 } +// InputWrapper is a wrapper for a streamed input. type InputWrapper struct { ctrl *inputCtrl inputLock sync.Mutex @@ -28,6 +29,7 @@ type InputWrapper struct { shutSig *shutdown.Signaller } +// WrapInput wraps a streamed input and starts the transaction processing loop. func WrapInput(i input.Streamed) *InputWrapper { var s int32 w := &InputWrapper{ @@ -42,6 +44,8 @@ func WrapInput(i input.Streamed) *InputWrapper { return w } +// CloseExistingInput instructs the wrapped input to stop consuming messages and +// waits for it to shut down. func (w *InputWrapper) CloseExistingInput(ctx context.Context, forSwap bool) error { w.inputLock.Lock() tmpInput := w.ctrl.input @@ -60,6 +64,7 @@ func (w *InputWrapper) CloseExistingInput(ctx context.Context, forSwap bool) err return tmpInput.WaitForClose(ctx) } +// SwapInput swaps the wrapped input with another one. func (w *InputWrapper) SwapInput(i input.Streamed) { var s int32 w.inputLock.Lock() @@ -70,10 +75,14 @@ func (w *InputWrapper) SwapInput(i input.Streamed) { w.inputLock.Unlock() } +// TransactionChan returns a transactions channel for consuming messages from +// the wrapped input\. func (w *InputWrapper) TransactionChan() <-chan message.Transaction { return w.tranChan } +// Connected returns a boolean indicating whether the wrapped input is currently +// connected to its target. func (w *InputWrapper) Connected() bool { w.inputLock.Lock() con := w.ctrl.input != nil && w.ctrl.input.Connected() @@ -143,14 +152,21 @@ func (w *InputWrapper) loop() { } } +// TriggerStopConsuming instructs the wrapped input to start shutting down +// resources once all pending messages are delivered and acknowledged. This call +// does not block. func (w *InputWrapper) TriggerStopConsuming() { w.shutSig.TriggerSoftStop() } +// TriggerCloseNow triggers the shut down of the wrapped input but should not +// block the calling goroutine. func (w *InputWrapper) TriggerCloseNow() { w.shutSig.TriggerHardStop() } +// WaitForClose is a blocking call to wait until the wrapped input has finished +// shutting down and cleaning up resources. func (w *InputWrapper) WaitForClose(ctx context.Context) error { select { case <-w.shutSig.HasStoppedChan(): diff --git a/internal/manager/mock/manager.go b/internal/manager/mock/manager.go index 2dd55cf55..221def00d 100644 --- a/internal/manager/mock/manager.go +++ b/internal/manager/mock/manager.go @@ -64,6 +64,8 @@ func NewManager() *Manager { } } +// EngineVersion returns the version stamp associated with the underlying +// benthos engine. func (m *Manager) EngineVersion() string { return m.Version } diff --git a/internal/pipeline/constructor.go b/internal/pipeline/constructor.go index ba9c3b04f..c79ff1554 100644 --- a/internal/pipeline/constructor.go +++ b/internal/pipeline/constructor.go @@ -14,6 +14,7 @@ import ( var threadsField = docs.FieldInt("threads", "The number of threads to execute processing pipelines across.").HasDefault(-1) +// ConfigSpec returns a configuration spec for a processor pipeline. func ConfigSpec() docs.FieldSpec { return docs.FieldObject( "pipeline", "Describes optional processing pipelines used for mutating messages.", @@ -46,7 +47,7 @@ func NewConfig() Config { //------------------------------------------------------------------------------ -// New creates an input type based on an input configuration. +// New creates a processor pipeline based on a processor pipeline configuration. func New(conf Config, mgr bundle.NewManagement) (processor.Pipeline, error) { processors := make([]processor.V1, len(conf.Processors)) for j, procConf := range conf.Processors { @@ -63,6 +64,7 @@ func New(conf Config, mgr bundle.NewManagement) (processor.Pipeline, error) { return NewPool(conf.Threads, mgr.Logger(), processors...) } +// FromAny returns a pipeline config from a parsed config, yaml node or map. func FromAny(prov docs.Provider, value any) (conf Config, err error) { switch t := value.(type) { case Config: diff --git a/internal/retries/retries.go b/internal/retries/retries.go index de9ef6fe7..a2c13d482 100644 --- a/internal/retries/retries.go +++ b/internal/retries/retries.go @@ -16,6 +16,7 @@ const ( crboFieldMaxElapsedTime = "max_elapsed_time" ) +// CommonRetryBackOffFields returns a list containing the retry backoff docs fields. func CommonRetryBackOffFields( defaultMaxRetries int, defaultInitInterval string, @@ -50,6 +51,7 @@ func fieldDurationOrEmptyStr(pConf *service.ParsedConfig, path ...string) (time. return pConf.FieldDuration(path...) } +// CommonRetryBackOffCtorFromParsed extracts the retry backoff fields from the config and returns a retry backoff constructor. func CommonRetryBackOffCtorFromParsed(pConf *service.ParsedConfig) (ctor func() backoff.BackOff, err error) { var maxRetries int if maxRetries, err = pConf.FieldInt(crboFieldMaxRetries); err != nil { diff --git a/internal/stream/config.go b/internal/stream/config.go index 778f968d1..4d3741a60 100644 --- a/internal/stream/config.go +++ b/internal/stream/config.go @@ -26,10 +26,12 @@ type Config struct { rawSource any } +// GetRawSource returns the stream config raw source. func (c *Config) GetRawSource() any { return c.rawSource } +// FromParsed extracts the stream fields from the parsed config and returns a stream config. func FromParsed(prov docs.Provider, pConf *docs.ParsedConfig, rawSource any) (conf Config, err error) { conf.rawSource = rawSource var v any diff --git a/internal/tracing/otel.go b/internal/tracing/otel.go index e32877b17..265c31ac4 100644 --- a/internal/tracing/otel.go +++ b/internal/tracing/otel.go @@ -22,7 +22,7 @@ func GetSpan(p *message.Part) *Span { return GetSpanFromContext(ctx) } -// GetSpan returns a span within a context. Returns nil if the context doesn't +// GetSpanFromContext returns a span within a context. Returns nil if the context doesn't // have a span attached. func GetSpanFromContext(ctx context.Context) *Span { t := trace.SpanFromContext(ctx) diff --git a/internal/tracing/v2/otel.go b/internal/tracing/v2/otel.go index 96bbe78c8..093d2bfef 100644 --- a/internal/tracing/v2/otel.go +++ b/internal/tracing/v2/otel.go @@ -21,8 +21,8 @@ func GetSpan(p *service.Message) *Span { return GetSpanFromContext(p.Context()) } -// GetSpan returns a span within a context. Returns nil if the context doesn't -// have a span attached. +// GetSpanFromContext returns a span within a context. Returns nil if the +// context doesn't have a span attached. func GetSpanFromContext(ctx context.Context) *Span { t := trace.SpanFromContext(ctx) return OtelSpan(ctx, t) diff --git a/internal/value/type_helpers.go b/internal/value/type_helpers.go index a1d08cb49..9e614194f 100644 --- a/internal/value/type_helpers.go +++ b/internal/value/type_helpers.go @@ -308,6 +308,10 @@ func IIsNull(i any) bool { return false } +// RestrictForComparison takes a boxed value of any type sanitizes it and, +// additionally, it attempts to perform the following conversions: +// - int64, uint64 and json.Number to float64. +// - []byte to string. func RestrictForComparison(v any) any { v = ISanitize(v) switch t := v.(type) { @@ -524,14 +528,16 @@ const ( maxUint32 = ^uint32(0) maxUint16 = ^uint16(0) maxUint8 = ^uint8(0) - MaxInt = maxUint >> 1 - maxInt32 = maxUint32 >> 1 - maxInt16 = maxUint16 >> 1 - maxInt8 = maxUint8 >> 1 - MinInt = ^int64(MaxInt) - minInt32 = ^int32(maxInt32) - minInt16 = ^int16(maxInt16) - minInt8 = ^int8(maxInt8) + // MaxInt represents the maximum value for a signed integer. + MaxInt = maxUint >> 1 + maxInt32 = maxUint32 >> 1 + maxInt16 = maxUint16 >> 1 + maxInt8 = maxUint8 >> 1 + // MinInt represents the minimum value for a signed integer. + MinInt = ^int64(MaxInt) + minInt32 = ^int32(maxInt32) + minInt16 = ^int16(maxInt16) + minInt8 = ^int8(maxInt8) ) // IToInt takes a boxed value and attempts to extract a number (int64) from it @@ -883,6 +889,8 @@ func ICompare(left, right any) bool { return false } +// IGetStringMap takes a boxed value and attempts to extract a string map from +// it. func IGetStringMap(v any) (map[string]string, error) { iMap, ok := v.(map[string]any) if !ok { diff --git a/public/service/config_docs.go b/public/service/config_docs.go index f883543b8..74782d31c 100644 --- a/public/service/config_docs.go +++ b/public/service/config_docs.go @@ -107,8 +107,8 @@ type TemplateDataPluginField struct { DefaultMarshalled string } -// PluginTemplateData returns a struct containing useful documentation details, -// which can then be injected into a template in order to populate a +// TemplateData returns a struct containing useful documentation details, which +// can then be injected into a template in order to populate a // documentation website automatically. func (c *ConfigView) TemplateData() (TemplateDataPlugin, error) { _, rootOnly := map[string]struct{}{ diff --git a/public/service/config_input.go b/public/service/config_input.go index 7426fe589..b1f82a20a 100644 --- a/public/service/config_input.go +++ b/public/service/config_input.go @@ -9,6 +9,7 @@ import ( "github.com/redpanda-data/benthos/v4/internal/docs" ) +// AutoRetryNacksToggleFieldName is the configuration field name for toggling the auto-replaying of nacks. const AutoRetryNacksToggleFieldName = "auto_replay_nacks" // NewAutoRetryNacksToggleField creates a configuration field for toggling the diff --git a/public/service/environment_schema.go b/public/service/environment_schema.go index 945b1c2a5..f123eb877 100644 --- a/public/service/environment_schema.go +++ b/public/service/environment_schema.go @@ -11,6 +11,7 @@ type EnvironmentSchema struct { s schema.Full } +// GenerateSchema creates a new EnvironmentSchema. func (e *Environment) GenerateSchema(version, dateBuilt string) *EnvironmentSchema { schema := schema.New(version, dateBuilt) return &EnvironmentSchema{s: schema} diff --git a/public/service/scanner.go b/public/service/scanner.go index 4755adf0f..499520f41 100644 --- a/public/service/scanner.go +++ b/public/service/scanner.go @@ -215,6 +215,7 @@ func (s *OwnedScannerCreator) Create(rdr io.ReadCloser, aFn AckFunc, details *Sc return &OwnedScanner{strm: is}, nil } +// Close closes the scanner. func (s *OwnedScannerCreator) Close(ctx context.Context) error { return s.rdr.Close(ctx) } diff --git a/public/service/service.go b/public/service/service.go index 69a4e7ad3..7f877c576 100644 --- a/public/service/service.go +++ b/public/service/service.go @@ -45,6 +45,7 @@ func RunCLI(ctx context.Context, optFuncs ...CLIOptFunc) { _ = cli.App(cliOpts.opts).RunContext(ctx, os.Args) } +// CLIOptBuilder represents a CLI opts builder. type CLIOptBuilder struct { opts *common.CLIOpts teeLogger *slog.Logger diff --git a/public/service/stream_schema.go b/public/service/stream_schema.go index 55a2f986b..7635ffde3 100644 --- a/public/service/stream_schema.go +++ b/public/service/stream_schema.go @@ -101,10 +101,10 @@ func genSchemaExample(field docs.FieldSpec, conf docs.SanitiseConfig) ([]byte, e return eBytes, nil } -// TemplateFieldsData attempts to prepare a list of structs containing -// information for the fields within the section specified of the schema. This -// information can then be fed into a template in order to generate -// documentation for the section. +// TemplateData attempts to prepare a list of structs containing information for +// the fields within the section specified of the schema. This information can +// then be fed into a template in order to generate documentation for the +// section. func (s *ConfigSchema) TemplateData(path ...string) (TemplateDataSchema, error) { field := docs.FieldObject("", "").WithChildren(s.fields...)