Skip to content

Commit

Permalink
Merge pull request redpanda-data#154 from redpanda-data/mihaitodor-ch…
Browse files Browse the repository at this point in the history
…ange-default-output

Change default output to `reject` when `stdout` is not available
  • Loading branch information
mihaitodor authored Jan 7, 2025
2 parents 9fdcbd0 + 0779b5a commit fd6473e
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file.
### Changed

- The `branch` processor no longer emits an entry in the log at error level when the child processors throw errors. (@mihaitodor)
- Streams and the StreamBuilder API now use `reject` by default when no output is specified in the config and `stdout` isn't registered (for example when the `io` components are not imported). (@mihaitodor)

## 4.42.0 - 2024-11-29

Expand Down
94 changes: 94 additions & 0 deletions internal/cli/app_test/app_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package app_test

import (
"context"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/internal/cli"
"github.com/redpanda-data/benthos/v4/internal/cli/common"
_ "github.com/redpanda-data/benthos/v4/public/components/pure"
"github.com/redpanda-data/benthos/v4/public/service"
)

type foobarInput struct {
done bool
ackErrChan chan error
}

func newFoobarInput() *foobarInput {
return &foobarInput{
ackErrChan: make(chan error),
}
}

func (i *foobarInput) Connect(context.Context) error {
return nil
}

func (i *foobarInput) Read(context.Context) (*service.Message, service.AckFunc, error) {
if i.done {
return nil, nil, service.ErrEndOfInput
}
i.done = true

return service.NewMessage([]byte("foobar")), func(_ context.Context, err error) error {
i.ackErrChan <- err

return nil
}, nil
}

func (i *foobarInput) Close(context.Context) error {
close(i.ackErrChan)

return nil
}

func TestRunCLIShutdown(t *testing.T) {
input := newFoobarInput()

// This test sits in its own package so it will get a fresh `service.GlobalEnvironment()` that we can alter safely.
err := service.RegisterInput("foobar", service.NewConfigSpec(),
func(_ *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
return input, nil
},
)
require.NoError(t, err)

// We only imported the pure components so `reject` is selected as the default output instead of `stdout`.
_, ok := service.GlobalEnvironment().GetOutputConfig("stdout")
require.False(t, ok, "stdout output registered")

tmpDir := t.TempDir()
confPath := filepath.Join(tmpDir, "foo.yaml")

require.NoError(t, os.WriteFile(confPath, []byte(`
input:
foobar: {}
`), 0o644))

closeChan := make(chan struct{})
go func() {
require.NoError(t, cli.App(common.NewCLIOpts("", "")).Run([]string{"benthos", "run", confPath}))
close(closeChan)
}()

select {
case err := <-input.ackErrChan:
require.ErrorContains(t, err, "message rejected by default because an output is not configured")
case <-time.After(1 * time.Second):
require.Fail(t, "timeout waiting for ack error")
}

// Wait for app to shutdown automatically.
select {
case <-closeChan:
case <-time.After(1 * time.Second):
require.Fail(t, "timeout waiting for ack error")
}
}
2 changes: 1 addition & 1 deletion internal/stream/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func Spec() docs.FieldSpecs {
}
}

defaultOutput := map[string]any{"inproc": ""}
defaultOutput := map[string]any{"reject": "message rejected by default because an output is not configured"}
if _, exists := bundle.GlobalEnvironment.GetDocs("stdout", docs.TypeOutput); exists {
defaultOutput = map[string]any{
"stdout": map[string]any{},
Expand Down
92 changes: 92 additions & 0 deletions public/service/stream_builder_test/stream_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package stream_builder_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

_ "github.com/redpanda-data/benthos/v4/public/components/pure"
"github.com/redpanda-data/benthos/v4/public/service"
)

type foobarInput struct {
done bool
ackErrChan chan error
}

func newFoobarInput() *foobarInput {
return &foobarInput{
ackErrChan: make(chan error),
}
}

func (i *foobarInput) Connect(context.Context) error {
return nil
}

func (i *foobarInput) Read(context.Context) (*service.Message, service.AckFunc, error) {
if i.done {
return nil, nil, service.ErrEndOfInput
}
i.done = true

return service.NewMessage([]byte("foobar")), func(_ context.Context, err error) error {
i.ackErrChan <- err

return nil
}, nil
}

func (i *foobarInput) Close(context.Context) error {
close(i.ackErrChan)

return nil
}

func TestStreamDefaultOutput(t *testing.T) {
input := newFoobarInput()

// This test sits in its own package so it will get a fresh `service.GlobalEnvironment()` that we can alter safely.
err := service.RegisterInput("foobar", service.NewConfigSpec(),
func(_ *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
return input, nil
},
)
require.NoError(t, err)

// We only imported the pure components so `reject` is selected as the default output instead of `stdout`.
_, ok := service.GlobalEnvironment().GetOutputConfig("stdout")
require.False(t, ok, "stdout output registered")

streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(`
input:
foobar: {}
`))
require.NoError(t, streamBuilder.SetLoggerYAML("level: OFF"))

stream, err := streamBuilder.Build()
require.NoError(t, err)

closeChan := make(chan struct{})
go func() {
require.NoError(t, stream.Run(context.Background()))
close(closeChan)
}()

select {
case err := <-input.ackErrChan:
require.ErrorContains(t, err, "message rejected by default because an output is not configured")
case <-time.After(1 * time.Second):
require.Fail(t, "timeout waiting for ack error")
}

// Wait for stream to shutdown automatically.
select {
case <-closeChan:
case <-time.After(1 * time.Second):
require.Fail(t, "timeout waiting for ack error")
}
}

0 comments on commit fd6473e

Please sign in to comment.