Skip to content

Commit

Permalink
updates benthos processor to use new javascript vm abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Jan 17, 2025
1 parent fea9f9e commit a39005c
Show file tree
Hide file tree
Showing 19 changed files with 711 additions and 1,093 deletions.
93 changes: 93 additions & 0 deletions internal/benthos_slogger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package benthos_slogger

import (
"context"
"log/slog"

"github.com/warpstreamlabs/bento/public/service"
)

var _ slog.Handler = (*benthosLogHandler)(nil)

type benthosLogHandler struct {
logger *service.Logger
attrs []slog.Attr
groups []string
}

func (h *benthosLogHandler) Enabled(ctx context.Context, level slog.Level) bool {
// We defer to the benthos logger and let it handle what leveling it wants to output
return true
}

func (h *benthosLogHandler) Handle(ctx context.Context, r slog.Record) error { //nolint:gocritic // Needs to conform to the slog.Handler interface
// Combine pre-defined attrs with record attrs
allAttrs := make([]slog.Attr, 0, len(h.attrs)+r.NumAttrs())
allAttrs = append(allAttrs, h.attrs...)

r.Attrs(func(attr slog.Attr) bool {
if !attr.Equal(slog.Attr{}) {
// Handle groups
if len(h.groups) > 0 {
last := h.groups[len(h.groups)-1]
if last != "" {
attr.Key = last + "." + attr.Key
}
}
allAttrs = append(allAttrs, attr)
}
return true
})

// Convert to key-value pairs for temporal logger
keyvals := make([]any, 0, len(allAttrs)*2)
for _, attr := range allAttrs {
keyvals = append(keyvals, attr.Key, attr.Value.Any())
}

switch r.Level {
case slog.LevelDebug:
h.logger.With(keyvals...).Debug(r.Message)
case slog.LevelInfo:
h.logger.With(keyvals...).Info(r.Message)
case slog.LevelWarn:
h.logger.With(keyvals...).Warn(r.Message)
case slog.LevelError:
h.logger.With(keyvals...).Error(r.Message)
}
return nil
}

func (h *benthosLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
newAttrs := []slog.Attr{}
newAttrs = append(newAttrs, h.attrs...)
newAttrs = append(newAttrs, attrs...)
return &benthosLogHandler{
logger: h.logger,
attrs: newAttrs,
groups: h.groups,
}
}

func (h *benthosLogHandler) WithGroup(name string) slog.Handler {
if name == "" {
return h
}
newGroups := []string{}
newGroups = append(newGroups, h.groups...)
newGroups = append(newGroups, name)
return &benthosLogHandler{
logger: h.logger,
attrs: h.attrs,
groups: newGroups,
}
}

func newBenthosLogHandler(logger *service.Logger) *benthosLogHandler {
return &benthosLogHandler{logger: logger}
}

// Returns a benthos logger wrapped as a slog.Logger to ease plugging in to the rest of the system
func NewSlogger(logger *service.Logger) *slog.Logger {
return slog.New(newBenthosLogHandler(logger))
}
170 changes: 170 additions & 0 deletions internal/javascript/functions/benthos/functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package benthos_functions

import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"strings"

"github.com/dop251/goja"
javascript_functions "github.com/nucleuscloud/neosync/internal/javascript/functions"
)

const (
namespace = "benthos"
)

func Get() []*javascript_functions.FunctionDefinition {
return []*javascript_functions.FunctionDefinition{
getV0Fetch(namespace),
getV0MsgSetString(namespace),
getV0MsgAsString(namespace),
getV0MsgSetStructured(namespace),
getV0MsgAsStructured(namespace),
getV0MsgSetMeta(namespace),
getV0MsgGetMeta(namespace),
getV0MsgMetaExists(namespace),
}
}

func getV0Fetch(namespace string) *javascript_functions.FunctionDefinition {
return javascript_functions.NewFunctionDefinition(namespace, "v0_fetch", func(r javascript_functions.Runner) javascript_functions.Function {
return func(ctx context.Context, call goja.FunctionCall, rt *goja.Runtime, l *slog.Logger) (any, error) {
var (
url string
httpHeaders map[string]any
method = "GET"
payload = ""
)
if err := javascript_functions.ParseFunctionArguments(call, &url, &httpHeaders, &method, &payload); err != nil {
return nil, err
}

var payloadReader io.Reader
if payload != "" {
payloadReader = strings.NewReader(payload)
}

req, err := http.NewRequestWithContext(ctx, method, url, payloadReader)
if err != nil {
return nil, err
}

// Parse HTTP headers
for k, v := range httpHeaders {
vStr, _ := v.(string)
req.Header.Add(k, vStr)
}

// Do request
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

return map[string]any{
"status": resp.StatusCode,
"body": string(respBody),
}, nil
}
})
}

func getV0MsgSetString(namespace string) *javascript_functions.FunctionDefinition {
return javascript_functions.NewFunctionDefinition(namespace, "v0_msg_set_string", func(r javascript_functions.Runner) javascript_functions.Function {
return func(ctx context.Context, call goja.FunctionCall, rt *goja.Runtime, l *slog.Logger) (any, error) {
var value string
if err := javascript_functions.ParseFunctionArguments(call, &value); err != nil {
return nil, err
}

r.ValueApi().SetBytes([]byte(value))
return nil, nil
}
})
}

func getV0MsgAsString(namespace string) *javascript_functions.FunctionDefinition {
return javascript_functions.NewFunctionDefinition(namespace, "v0_msg_as_string", func(r javascript_functions.Runner) javascript_functions.Function {
return func(ctx context.Context, call goja.FunctionCall, rt *goja.Runtime, l *slog.Logger) (any, error) {
b, err := r.ValueApi().AsBytes()
if err != nil {
return nil, err
}
return string(b), nil
}
})
}

func getV0MsgSetStructured(namespace string) *javascript_functions.FunctionDefinition {
return javascript_functions.NewFunctionDefinition(namespace, "v0_msg_set_structured", func(r javascript_functions.Runner) javascript_functions.Function {
return func(ctx context.Context, call goja.FunctionCall, rt *goja.Runtime, l *slog.Logger) (any, error) {
var value any
if err := javascript_functions.ParseFunctionArguments(call, &value); err != nil {
return nil, err
}

r.ValueApi().SetStructured(value)
return nil, nil
}
})
}

func getV0MsgAsStructured(namespace string) *javascript_functions.FunctionDefinition {
return javascript_functions.NewFunctionDefinition(namespace, "v0_msg_as_structured", func(r javascript_functions.Runner) javascript_functions.Function {
return func(ctx context.Context, call goja.FunctionCall, rt *goja.Runtime, l *slog.Logger) (any, error) {
return r.ValueApi().AsStructured()
}
})
}

func getV0MsgSetMeta(namespace string) *javascript_functions.FunctionDefinition {
return javascript_functions.NewFunctionDefinition(namespace, "v0_msg_set_meta", func(r javascript_functions.Runner) javascript_functions.Function {
return func(ctx context.Context, call goja.FunctionCall, rt *goja.Runtime, l *slog.Logger) (any, error) {
var key string
var value any
if err := javascript_functions.ParseFunctionArguments(call, &key, &value); err != nil {
return nil, err
}
r.ValueApi().MetaSetMut(key, value)
return nil, nil
}
})
}

func getV0MsgGetMeta(namespace string) *javascript_functions.FunctionDefinition {
return javascript_functions.NewFunctionDefinition(namespace, "v0_msg_get_meta", func(r javascript_functions.Runner) javascript_functions.Function {
return func(ctx context.Context, call goja.FunctionCall, rt *goja.Runtime, l *slog.Logger) (any, error) {
var key string
if err := javascript_functions.ParseFunctionArguments(call, &key); err != nil {
return nil, err
}
result, ok := r.ValueApi().MetaGet(key)
if !ok {
return nil, fmt.Errorf("key %s not found", key)
}
return result, nil
}
})
}

func getV0MsgMetaExists(namespace string) *javascript_functions.FunctionDefinition {
return javascript_functions.NewFunctionDefinition(namespace, "v0_msg_exists_meta", func(r javascript_functions.Runner) javascript_functions.Function {
return func(ctx context.Context, call goja.FunctionCall, rt *goja.Runtime, l *slog.Logger) (any, error) {
var key string
if err := javascript_functions.ParseFunctionArguments(call, &key); err != nil {
return nil, err
}
_, ok := r.ValueApi().MetaGet(key)
return ok, nil
}
})
}
12 changes: 12 additions & 0 deletions internal/javascript/functions/benthos/functions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package benthos_functions

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGet(t *testing.T) {
functions := Get()
require.NotEmpty(t, functions)
}
Loading

0 comments on commit a39005c

Please sign in to comment.