diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e4987003..e2aab4ada 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file. ### Changed - The `branch` processor no longer emits an entry in the log at error level when the child processors throw errors. (@mihaitodor) +- Streams and the StreamBuilder API now use `reject` by default when no output is specified in the config and `stdout` isn't registered (for example when the `io` components are not imported). (@mihaitodor) ## 4.42.0 - 2024-11-29 diff --git a/internal/cli/app_test/app_test.go b/internal/cli/app_test/app_test.go new file mode 100644 index 000000000..77d6e3d84 --- /dev/null +++ b/internal/cli/app_test/app_test.go @@ -0,0 +1,94 @@ +package app_test + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/internal/cli" + "github.com/redpanda-data/benthos/v4/internal/cli/common" + _ "github.com/redpanda-data/benthos/v4/public/components/pure" + "github.com/redpanda-data/benthos/v4/public/service" +) + +type foobarInput struct { + done bool + ackErrChan chan error +} + +func newFoobarInput() *foobarInput { + return &foobarInput{ + ackErrChan: make(chan error), + } +} + +func (i *foobarInput) Connect(context.Context) error { + return nil +} + +func (i *foobarInput) Read(context.Context) (*service.Message, service.AckFunc, error) { + if i.done { + return nil, nil, service.ErrEndOfInput + } + i.done = true + + return service.NewMessage([]byte("foobar")), func(_ context.Context, err error) error { + i.ackErrChan <- err + + return nil + }, nil +} + +func (i *foobarInput) Close(context.Context) error { + close(i.ackErrChan) + + return nil +} + +func TestRunCLIShutdown(t *testing.T) { + input := newFoobarInput() + + // This test sits in its own package so it will get a fresh `service.GlobalEnvironment()` that we can alter safely. + err := service.RegisterInput("foobar", service.NewConfigSpec(), + func(_ *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return input, nil + }, + ) + require.NoError(t, err) + + // We only imported the pure components so `reject` is selected as the default output instead of `stdout`. + _, ok := service.GlobalEnvironment().GetOutputConfig("stdout") + require.False(t, ok, "stdout output registered") + + tmpDir := t.TempDir() + confPath := filepath.Join(tmpDir, "foo.yaml") + + require.NoError(t, os.WriteFile(confPath, []byte(` +input: + foobar: {} +`), 0o644)) + + closeChan := make(chan struct{}) + go func() { + require.NoError(t, cli.App(common.NewCLIOpts("", "")).Run([]string{"benthos", "run", confPath})) + close(closeChan) + }() + + select { + case err := <-input.ackErrChan: + require.ErrorContains(t, err, "message rejected by default because an output is not configured") + case <-time.After(1 * time.Second): + require.Fail(t, "timeout waiting for ack error") + } + + // Wait for app to shutdown automatically. + select { + case <-closeChan: + case <-time.After(1 * time.Second): + require.Fail(t, "timeout waiting for ack error") + } +} diff --git a/internal/stream/docs.go b/internal/stream/docs.go index 2f8ea64b4..091e59491 100644 --- a/internal/stream/docs.go +++ b/internal/stream/docs.go @@ -15,7 +15,7 @@ func Spec() docs.FieldSpecs { } } - defaultOutput := map[string]any{"inproc": ""} + defaultOutput := map[string]any{"reject": "message rejected by default because an output is not configured"} if _, exists := bundle.GlobalEnvironment.GetDocs("stdout", docs.TypeOutput); exists { defaultOutput = map[string]any{ "stdout": map[string]any{}, diff --git a/public/service/stream_builder_test/stream_builder_test.go b/public/service/stream_builder_test/stream_builder_test.go new file mode 100644 index 000000000..468e11fff --- /dev/null +++ b/public/service/stream_builder_test/stream_builder_test.go @@ -0,0 +1,92 @@ +package stream_builder_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + _ "github.com/redpanda-data/benthos/v4/public/components/pure" + "github.com/redpanda-data/benthos/v4/public/service" +) + +type foobarInput struct { + done bool + ackErrChan chan error +} + +func newFoobarInput() *foobarInput { + return &foobarInput{ + ackErrChan: make(chan error), + } +} + +func (i *foobarInput) Connect(context.Context) error { + return nil +} + +func (i *foobarInput) Read(context.Context) (*service.Message, service.AckFunc, error) { + if i.done { + return nil, nil, service.ErrEndOfInput + } + i.done = true + + return service.NewMessage([]byte("foobar")), func(_ context.Context, err error) error { + i.ackErrChan <- err + + return nil + }, nil +} + +func (i *foobarInput) Close(context.Context) error { + close(i.ackErrChan) + + return nil +} + +func TestStreamDefaultOutput(t *testing.T) { + input := newFoobarInput() + + // This test sits in its own package so it will get a fresh `service.GlobalEnvironment()` that we can alter safely. + err := service.RegisterInput("foobar", service.NewConfigSpec(), + func(_ *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return input, nil + }, + ) + require.NoError(t, err) + + // We only imported the pure components so `reject` is selected as the default output instead of `stdout`. + _, ok := service.GlobalEnvironment().GetOutputConfig("stdout") + require.False(t, ok, "stdout output registered") + + streamBuilder := service.NewStreamBuilder() + require.NoError(t, streamBuilder.SetYAML(` +input: + foobar: {} +`)) + require.NoError(t, streamBuilder.SetLoggerYAML("level: OFF")) + + stream, err := streamBuilder.Build() + require.NoError(t, err) + + closeChan := make(chan struct{}) + go func() { + require.NoError(t, stream.Run(context.Background())) + close(closeChan) + }() + + select { + case err := <-input.ackErrChan: + require.ErrorContains(t, err, "message rejected by default because an output is not configured") + case <-time.After(1 * time.Second): + require.Fail(t, "timeout waiting for ack error") + } + + // Wait for stream to shutdown automatically. + select { + case <-closeChan: + case <-time.After(1 * time.Second): + require.Fail(t, "timeout waiting for ack error") + } +}