Skip to content

Commit

Permalink
Enable exported rule for the revive linter
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Jun 23, 2024
1 parent 55c3122 commit c0eb649
Show file tree
Hide file tree
Showing 55 changed files with 241 additions and 79 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ linters-settings:
enable-all-rules: false
rules:
- name: superfluous-else
- name: exported
errcheck:
exclude-functions:
- (*github.com/redpanda-data/benthos/v4/internal/batch.Error).Failed
Expand Down
3 changes: 2 additions & 1 deletion internal/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Config struct {
BasicAuth httpserver.BasicAuthConfig `json:"basic_auth" yaml:"basic_auth"`
}

// NewConfig creates a new API config with default values.
// NewConfig creates an API configuration struct fully populated with default values.
func NewConfig() Config {
return Config{
Address: "0.0.0.0:4195",
Expand All @@ -42,6 +42,7 @@ func NewConfig() Config {
}
}

// FromParsed extracts the Benthos API fields from the config and returns a Benthos API config.
func FromParsed(pConf *docs.ParsedConfig) (conf Config, err error) {
if conf.Address, err = pConf.FieldString(fieldAddress); err != nil {
return
Expand Down
1 change: 1 addition & 0 deletions internal/api/dynamic_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (d *Dynamic) HandleList(w http.ResponseWriter, r *http.Request) {
}
}

// HandleUptime is an http.HandleFunc for returning the uptime of the dynamic component.
func (d *Dynamic) HandleUptime(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]

Expand Down
18 changes: 1 addition & 17 deletions internal/batch/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,19 @@ func CtxCollapsedCount(ctx context.Context) int {
return 1
}

// CollapsedCount attempts to extract the actual number of messages that were
// collapsed into the resulting message part. This value could be greater than 1
// when users configure processors that archive batched message parts.
func CollapsedCount(p *message.Part) int {
return CtxCollapsedCount(message.GetContext(p))
}

// MessageCollapsedCount attempts to extract the actual number of messages that
// were combined into the resulting batched message parts. This value could
// differ from message.Len() when users configure processors that archive
// batched message parts.
func MessageCollapsedCount(m message.Batch) int {
total := 0
_ = m.Iter(func(i int, p *message.Part) error {
total += CollapsedCount(p)
total += CtxCollapsedCount(message.GetContext(p))
return nil
})
return total
}

// WithCollapsedCount returns a message part with a context indicating that this
// message is the result of collapsing a number of messages. This allows
// downstream components to know how many total messages were combined.
func WithCollapsedCount(p *message.Part, count int) *message.Part {
// Start with the previous length which could also be >1.
ctx := CtxWithCollapsedCount(message.GetContext(p), count)
return message.WithContext(ctx, p)
}

// CtxWithCollapsedCount returns a message part with a context indicating that this
// message is the result of collapsing a number of messages. This allows
// downstream components to know how many total messages were combined.
Expand Down
16 changes: 8 additions & 8 deletions internal/batch/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import (
)

func TestCount(t *testing.T) {
p1 := message.NewPart([]byte("foo bar"))
p1 := message.GetContext(message.NewPart([]byte("foo bar")))

p2 := WithCollapsedCount(p1, 2)
p3 := WithCollapsedCount(p2, 3)
p4 := WithCollapsedCount(p1, 4)
p2 := CtxWithCollapsedCount(p1, 2)
p3 := CtxWithCollapsedCount(p2, 3)
p4 := CtxWithCollapsedCount(p1, 4)

assert.Equal(t, 1, CollapsedCount(p1))
assert.Equal(t, 2, CollapsedCount(p2))
assert.Equal(t, 4, CollapsedCount(p3))
assert.Equal(t, 4, CollapsedCount(p4))
assert.Equal(t, 1, CtxCollapsedCount(p1))
assert.Equal(t, 2, CtxCollapsedCount(p2))
assert.Equal(t, 4, CtxCollapsedCount(p3))
assert.Equal(t, 4, CtxCollapsedCount(p4))
}

func TestMessageCount(t *testing.T) {
Expand Down
25 changes: 25 additions & 0 deletions internal/bloblang/mapping/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/redpanda-data/benthos/v4/internal/value"
)

// Statement represents a bloblang mapping statement.
type Statement interface {
QueryTargets(ctx query.TargetsContext) (query.TargetsContext, []query.TargetPath)
AssignmentTargets() []TargetPath
Expand All @@ -16,12 +17,17 @@ type Statement interface {

//------------------------------------------------------------------------------

// SingleStatement describes an isolated mapping statement, where the result of
// a query function is to be mapped according to an Assignment.
type SingleStatement struct {
input []rune
assignment Assignment
query query.Function
}

// NewSingleStatement initialises a new mapping statement from an Assignment and
// query.Function. The input parameter is an optional slice pointing to the
// parsed expression that created the statement.
func NewSingleStatement(input []rune, assignment Assignment, query query.Function) *SingleStatement {
return &SingleStatement{
input: input,
Expand All @@ -30,18 +36,24 @@ func NewSingleStatement(input []rune, assignment Assignment, query query.Functio
}
}

// QueryTargets returns the query targets for the underlying query.
func (s *SingleStatement) QueryTargets(ctx query.TargetsContext) (query.TargetsContext, []query.TargetPath) {
return s.query.QueryTargets(ctx)
}

// AssignmentTargets returns a representation of what the underlying assignment
// targets.
func (s *SingleStatement) AssignmentTargets() []TargetPath {
return []TargetPath{s.assignment.Target()}
}

// Input returns the underlying parsed expression of this statement.
func (s *SingleStatement) Input() []rune {
return s.input
}

// Execute executes this statement and applies the result onto the assigned
// destination.
func (s *SingleStatement) Execute(fnContext query.FunctionContext, asContext AssignmentContext) error {
res, err := s.query.Exec(fnContext)
if err != nil {
Expand All @@ -61,22 +73,29 @@ type rootLevelIfStatementPair struct {
statements []Statement
}

// RootLevelIfStatement describes an isolated conditional mapping statement.
type RootLevelIfStatement struct {
input []rune
pairs []rootLevelIfStatementPair
}

// NewRootLevelIfStatement initialises a new conditional mapping statement. The
// input parameter is a slice pointing to the parsed expression that created the
// statement.
func NewRootLevelIfStatement(input []rune) *RootLevelIfStatement {
return &RootLevelIfStatement{
input: input,
}
}

// Add adds query statement pairs to the root level if statement.
func (r *RootLevelIfStatement) Add(query query.Function, statements ...Statement) *RootLevelIfStatement {
r.pairs = append(r.pairs, rootLevelIfStatementPair{query: query, statements: statements})
return r
}

// QueryTargets returns the query targets for the underlying conditional mapping
// statement.
func (r *RootLevelIfStatement) QueryTargets(ctx query.TargetsContext) (query.TargetsContext, []query.TargetPath) {
var paths []query.TargetPath
for _, p := range r.pairs {
Expand All @@ -92,6 +111,8 @@ func (r *RootLevelIfStatement) QueryTargets(ctx query.TargetsContext) (query.Tar
return ctx, paths
}

// AssignmentTargets returns a representation of what the underlying conditional
// mapping statement targets.
func (r *RootLevelIfStatement) AssignmentTargets() []TargetPath {
var paths []TargetPath
for _, p := range r.pairs {
Expand All @@ -102,10 +123,14 @@ func (r *RootLevelIfStatement) AssignmentTargets() []TargetPath {
return paths
}

// Input returns the underlying parsed expression of this conditional mapping
// statement.
func (r *RootLevelIfStatement) Input() []rune {
return r.input
}

// Execute executes this statement if the underlying condition evaluates to
// true.
func (r *RootLevelIfStatement) Execute(fnContext query.FunctionContext, asContext AssignmentContext) error {
for i, p := range r.pairs {
if p.query != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/bloblang/parser/combinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func Fail[T any](err *Error, input []rune) Result[T] {
}
}

// ResultInto creates a new result containing the error and the remaining data
// from the input.
func ResultInto[T, L any](from Result[L]) Result[T] {
return Result[T]{
Err: from.Err,
Expand Down Expand Up @@ -299,6 +301,8 @@ var Null = func() Func[any] {
}
}()

// DiscardedWhitespaceNewlineComments skips over any spaces, tabs and comments
// followed by a mandatory line break.
var DiscardedWhitespaceNewlineComments = DiscardAll(OneOf(SpacesAndTabs, NewlineAllowComment))

// Array parses an array literal.
Expand Down
15 changes: 5 additions & 10 deletions internal/bloblang/query/function_ctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,11 @@ type Function interface {
// recreate the function.
Annotation() string

// MarshalString returns a string representation of the function that could
// be parsed back into the exact equivalent function. The result will be
// normalized, which means the representation may not match the original
// input from the user.
// MarshalString() string

// Returns a list of targets that this function attempts (or may attempt) to
// access. A context must be provided that describes the current execution
// context that this function will be executed upon, which is how it is able
// to determine the full path and origin of values that it targets.
// QueryTargets returns a list of targets that this function attempts (or
// may attempt) to access. A context must be provided that describes the
// current execution context that this function will be executed upon, which
// is how it is able to determine the full path and origin of values that it
// targets.
//
// A new context is returned which should be provided to methods that act
// upon this function when querying their own targets.
Expand Down
6 changes: 3 additions & 3 deletions internal/bloblang/query/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func ParamAny(name, description string) ParamDefinition {
}
}

// NoDynamic disables any form of dynamic assignment for this parameter. This is
// quite limiting (prevents variables from being used, etc) and so should only
// be used with caution.
// DisableDynamic disables any form of dynamic assignment for this parameter.
// This is quite limiting (prevents variables from being used, etc) and so
// should only be used with caution.
func (d ParamDefinition) DisableDynamic() ParamDefinition {
d.NoDynamic = true
return d
Expand Down
5 changes: 3 additions & 2 deletions internal/cli/common/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
"github.com/redpanda-data/benthos/v4/internal/log"
)

type CLIStreamBootstrapFunc func()

// CLIOpts contains the available CLI configuration options.
type CLIOpts struct {
Version string
DateBuilt string
Expand All @@ -30,6 +29,7 @@ type CLIOpts struct {
OnLoggerInit func(l log.Modular) (log.Modular, error)
}

// NewCLIOpts returns a new CLIOpts instance populated with default values.
func NewCLIOpts(version, dateBuilt string) *CLIOpts {
binaryName := ""
if len(os.Args) > 0 {
Expand Down Expand Up @@ -57,6 +57,7 @@ func NewCLIOpts(version, dateBuilt string) *CLIOpts {
}
}

// ExecTemplate parses a template and applies the CLI branding information to it.
func (c *CLIOpts) ExecTemplate(str string) string {
t, err := template.New("cli").Parse(str)
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions internal/cli/studio/tracing/observed.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@ import (
"github.com/redpanda-data/benthos/v4/internal/bundle/tracing"
)

type ObservedSummary struct {
Input int `json:"input"`
Output int `json:"output"`
ProcessorErrors int `json:"processor_errors"`
}

// ObservedEvent observed event container.
type ObservedEvent struct {
Type string `json:"type"`
Content string `json:"content"`
Expand Down
4 changes: 1 addition & 3 deletions internal/codec/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"github.com/redpanda-data/benthos/v4/internal/message"
)

// ReaderDocs is a static field documentation for input codecs.
var ReaderDocs = NewReaderDocs("codec")

// NewReaderDocs returns the field documentation for input codecs.
func NewReaderDocs(name string) docs.FieldSpec {
return docs.FieldString(
name, "The way in which the bytes of a data source should be converted into discrete messages, codecs are useful for specifying how large files or continuous streams of data might be processed in small chunks rather than loading it all in memory. It's possible to consume lines using a custom delimiter with the `delim:x` codec, where x is the character sequence custom delimiter. Codecs can be chained with `/`, for example a gzip compressed CSV file can be consumed with the codec `gzip/csv`.", "lines", "delim:\t", "delim:foobar", "gzip/csv",
Expand Down
7 changes: 5 additions & 2 deletions internal/codec/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
"github.com/redpanda-data/benthos/v4/internal/docs"
)

var WriterDocs = NewWriterDocs("codec")

// NewWriterDocs returns the field documentation for writer codecs.
func NewWriterDocs(name string) docs.FieldSpec {
return docs.FieldString(
name, "The way in which the bytes of messages should be written out into the output data stream. It's possible to write lines using a custom delimiter with the `delim:x` codec, where x is the character sequence custom delimiter.", "lines", "delim:\t", "delim:foobar",
Expand All @@ -24,12 +23,16 @@ func NewWriterDocs(name string) docs.FieldSpec {

//------------------------------------------------------------------------------

// SuffixFn is a function which should be called by codec writers to determine
// when a custom suffix must be emitted by the writer codec.
type SuffixFn func(data []byte) ([]byte, bool)

// WriterConfig is a general configuration struct that covers all writer codecs.
type WriterConfig struct {
Append bool
}

// GetWriter returns a codec writer.
func GetWriter(codec string) (sFn SuffixFn, appendMode bool, err error) {
switch codec {
case "all-bytes":
Expand Down
1 change: 1 addition & 0 deletions internal/component/buffer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func NewConfig() Config {
}
}

// FromAny returns a buffer config from a parsed config, yaml node or map.
func FromAny(prov docs.Provider, value any) (conf Config, err error) {
switch t := value.(type) {
case Config:
Expand Down
1 change: 1 addition & 0 deletions internal/component/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewConfig() Config {
}
}

// FromAny returns a component cache config from a parsed config, yaml node or map.
func FromAny(prov docs.Provider, value any) (conf Config, err error) {
switch t := value.(type) {
case Config:
Expand Down
1 change: 1 addition & 0 deletions internal/component/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewConfig() Config {
}
}

// FromAny returns an input config from a parsed config, yaml node or map.
func FromAny(prov docs.Provider, value any) (conf Config, err error) {
switch t := value.(type) {
case Config:
Expand Down
1 change: 1 addition & 0 deletions internal/component/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewConfig() Config {
}
}

// FromAny returns a metrics config from a parsed config, yaml node or map.
func FromAny(prov docs.Provider, value any) (conf Config, err error) {
switch t := value.(type) {
case Config:
Expand Down
7 changes: 5 additions & 2 deletions internal/component/metrics/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,27 @@ func (l *LocalStat) Incr(count int64) {
atomic.AddInt64(l.Value, count)
}

// Decr decrements a metric by an amount.
// Decr decrements a metric by an int64 amount.
func (l *LocalStat) Decr(count int64) {
atomic.AddInt64(l.Value, -count)
}

// Set sets a gauge metric.
// Set sets a gauge metric to an int64 value.
func (l *LocalStat) Set(value int64) {
atomic.StoreInt64(l.Value, value)
}

// IncrFloat64 increments a metric by a float64 amount.
func (l *LocalStat) IncrFloat64(count float64) {
l.Incr(int64(count))
}

// DecrFloat64 decrements a metric by a float64 amount.
func (l *LocalStat) DecrFloat64(count float64) {
l.Decr(int64(count))
}

// SetFloat64 sets a metric to a float64 value.
func (l *LocalStat) SetFloat64(value float64) {
l.Set(int64(value))
}
Expand Down
Loading

0 comments on commit c0eb649

Please sign in to comment.