Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Okhlopkov <[email protected]>
  • Loading branch information
Pavel Okhlopkov committed Oct 28, 2024
2 parents 10ceed7 + 9fbc545 commit 85b478e
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 73 deletions.
8 changes: 4 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {
temporalValues map[string]*TemporalValue
expireTicker *time.Ticker

logEntry *unilogger.Logger
logger *unilogger.Logger
}

func NewConfig(logger *unilogger.Logger) *Config {
Expand All @@ -52,7 +52,7 @@ func NewConfig(logger *unilogger.Logger) *Config {
values: make(map[string]string),
temporalValues: make(map[string]*TemporalValue),
errors: make(map[string]error),
logEntry: logger.With(slog.String("component", "runtimeConfig")),
logger: logger.With(slog.String("component", "runtimeConfig")),
}
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func (c *Config) expireOverrides() {

for _, expire := range expires {
name, oldValue, newValue := expire[0], expire[1], expire[2]
c.logEntry.Debug("Parameter is expired", slog.String("parameter", name))
c.logger.Debug("Parameter is expired", slog.String("parameter", name))
c.callOnChange(name, oldValue, newValue)
}
}
Expand All @@ -267,7 +267,7 @@ func (c *Config) callOnChange(name string, oldValue string, newValue string) {
}
err := c.params[name].onChange(oldValue, newValue)
if err != nil {
c.logEntry.Error("OnChange handler failed for parameter during value change values",
c.logger.Error("OnChange handler failed for parameter during value change values",
slog.String("parameter", name), slog.String("old_value", oldValue), slog.String("new_value", newValue), slog.String("error", err.Error()))
}
c.m.Lock()
Expand Down
79 changes: 66 additions & 13 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package executor

import (
"bufio"
"bytes"
"context"
"encoding/json"
Expand All @@ -13,7 +14,6 @@ import (
"time"

"github.com/flant/shell-operator/pkg/unilogger"
log "github.com/flant/shell-operator/pkg/unilogger"
utils "github.com/flant/shell-operator/pkg/utils/labels"

"github.com/flant/shell-operator/pkg/app"
Expand All @@ -28,7 +28,7 @@ type CmdUsage struct {
func Run(cmd *exec.Cmd) error {
// TODO context: hook name, hook phase, hook binding
// TODO observability
log.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)
unilogger.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)

return cmd.Run()
}
Expand All @@ -52,6 +52,7 @@ func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string, logger *unilogge
if len(stdErr.Bytes()) > 0 {
return nil, fmt.Errorf("%s", stdErr.String())
}

return nil, err
}

Expand All @@ -61,6 +62,7 @@ func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string, logger *unilogge
Sys: cmd.ProcessState.SystemTime(),
User: cmd.ProcessState.UserTime(),
}

// FIXME Maxrss is Unix specific.
sysUsage := cmd.ProcessState.SysUsage()
if v, ok := sysUsage.(*syscall.Rusage); ok {
Expand All @@ -80,21 +82,18 @@ type proxyLogger struct {
buf []byte
}

func (pl *proxyLogger) Write(p []byte) (n int, err error) {
func (pl *proxyLogger) Write(p []byte) (int, error) {
if !pl.logProxyHookJSON {
str := strings.TrimSpace(string(p))

if str != "" {
pl.logger.Log(context.Background(), unilogger.LevelInfo.Level(), str)
}
pl.writerScanner(p)

return len(p), nil
}

// join all parts of json
pl.buf = append(pl.buf, p...)

var line interface{}
err = json.Unmarshal(pl.buf, &line)
err := json.Unmarshal(pl.buf, &line)
if err != nil {
if err.Error() == "unexpected end of JSON input" {
return len(p), nil
Expand All @@ -109,25 +108,79 @@ func (pl *proxyLogger) Write(p []byte) (n int, err error) {
}()

if !ok {
pl.logger.Debugf("json log line not map[string]interface{}: %v", line)
pl.logger.Debug("json log line not map[string]interface{}", slog.Any("line", line))

// fall back to using the logger
pl.logger.Info(string(p))

return len(p), err
}

logEntry := pl.logger.With(app.ProxyJsonLogKey, true)
logger := pl.logger.With(app.ProxyJsonLogKey, true)

logLineRaw, _ := json.Marshal(logMap)
logLine := string(logLineRaw)

if len(logLine) > 10000 {
logLine = fmt.Sprintf("%s:truncated", string(logLine[:10000]))

logger.Log(context.Background(), unilogger.LevelFatal.Level(), "hook result", slog.Any("hook", map[string]any{
"truncated": logLine,
}))

return len(p), nil
}

// logEntry.Log(log.FatalLevel, string(logLine))
logEntry.Log(context.Background(), unilogger.LevelFatal.Level(), "hook result", slog.Any("hook", logMap))
logger.Log(context.Background(), unilogger.LevelFatal.Level(), "hook result", slog.Any("hook", logMap))

return len(p), nil
}

func (pl *proxyLogger) writerScanner(p []byte) {
scanner := bufio.NewScanner(bytes.NewReader(p))

// Set the buffer size to the maximum token size to avoid buffer overflows
scanner.Buffer(make([]byte, bufio.MaxScanTokenSize), bufio.MaxScanTokenSize)

// Define a split function to split the input into chunks of up to 64KB
chunkSize := bufio.MaxScanTokenSize // 64KB
splitFunc := func(data []byte, atEOF bool) (int, []byte, error) {
if len(data) >= chunkSize {
return chunkSize, data[:chunkSize], nil
}

return bufio.ScanLines(data, atEOF)
}

// Use the custom split function to split the input
scanner.Split(splitFunc)

// Scan the input and write it to the logger using the specified print function
for scanner.Scan() {
// prevent empty logging
str := strings.TrimSpace(scanner.Text())
if str == "" {
continue
}

if len(str) > 10000 {
str = fmt.Sprintf("%s:truncated", str[:10000])
}

pl.logger.Info(str)
}

// If there was an error while scanning the input, log an error
if err := scanner.Err(); err != nil {
pl.logger.Error("reading from scanner", slog.String("error", err.Error()))
}
}

func Output(cmd *exec.Cmd) (output []byte, err error) {
// TODO context: hook name, hook phase, hook binding
// TODO observability
log.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)
unilogger.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)
output, err = cmd.Output()
return
}
Expand Down
76 changes: 63 additions & 13 deletions pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"os"
"os/exec"
"regexp"
"testing"
"time"

Expand All @@ -17,49 +18,83 @@ import (
)

func TestRunAndLogLines(t *testing.T) {
logger := unilogger.NewLogger(unilogger.Options{})
loggerOpts := unilogger.Options{
TimeFunc: func(_ time.Time) time.Time {
parsedTime, err := time.Parse(time.DateTime, "2006-01-02 15:04:05")
if err != nil {
assert.NoError(t, err)
}

return parsedTime
},
}
logger := unilogger.NewLogger(loggerOpts)
logger.SetLevel(unilogger.LevelInfo)

var buf bytes.Buffer
logger.SetOutput(&buf)

t.Run("simple log", func(t *testing.T) {
app.LogProxyHookJSON = true
// time="2023-07-10T18:13:42+04:00" level=fatal msg="{\"a\":\"b\",\"foo\":\"baz\",\"output\":\"stdout\"}" a=b output=stdout proxyJsonLog=true

cmd := exec.Command("echo", `{"foo": "baz"}`)

_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
assert.NoError(t, err)
assert.Contains(t, buf.String(), `"level":"fatal","msg":"hook result","hook":{"foo":"baz"},"output":"stdout","proxyJsonLog":true`)

assert.Equal(t, buf.String(), `{"level":"fatal","msg":"hook result","hook":{"foo":"baz"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})

t.Run("not json log", func(t *testing.T) {
app.LogProxyHookJSON = false
// time="2023-07-10T18:14:25+04:00" level=info msg=foobar a=b output=stdout
cmd := exec.Command("echo", `foobar`)

_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
time.Sleep(100 * time.Millisecond)
assert.NoError(t, err)
assert.Contains(t, buf.String(), `"level":"info","msg":"foobar","output":"stdout"`)

assert.Equal(t, buf.String(), `{"level":"info","msg":"foobar","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})

// TODO: check test
t.Run("long file", func(t *testing.T) {
t.Run("long file must be truncated", func(t *testing.T) {
f, err := os.CreateTemp(os.TempDir(), "testjson-*.json")
require.NoError(t, err)

defer os.RemoveAll(f.Name())

_, _ = io.WriteString(f, `{"foo": "`+randStringRunes(1024*1024)+`"}`)

app.LogProxyHookJSON = true
cmd := exec.Command("cat", f.Name())

_, err = RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
assert.NoError(t, err)

reg := regexp.MustCompile(`{"level":"fatal","msg":"hook result","hook":{"truncated":".*:truncated"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"`)
assert.Regexp(t, reg, buf.String())

buf.Reset()
})

t.Run("long file non json must be truncated", func(t *testing.T) {
f, err := os.CreateTemp(os.TempDir(), "testjson-*.json")
require.NoError(t, err)

defer os.RemoveAll(f.Name())

_, _ = io.WriteString(f, `result `+randStringRunes(1024*1024))

app.LogProxyHookJSON = false
cmd := exec.Command("cat", f.Name())

_, err = RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
assert.NoError(t, err)
// assert.Equal(t, buf.String(), `\",\"output\":\"stdout\"}" a=b output=stdout proxyJsonLog=true`)
assert.Contains(t, buf.String(), `"level":"fatal","msg":"hook result","hook":{"foo":`)

reg := regexp.MustCompile(`{"level":"info","msg":"result .*:truncated","output":"stdout","time":"2006-01-02T15:04:05Z"`)
assert.Regexp(t, reg, buf.String())

buf.Reset()
})
Expand All @@ -70,8 +105,9 @@ func TestRunAndLogLines(t *testing.T) {
cmd := exec.Command("echo", `["a","b","c"]`)
_, err := RunAndLogLines(cmd, map[string]string{"a": "b"}, logger)
assert.NoError(t, err)
assert.Contains(t, buf.String(), `"level":"debug","msg":"json log line not map[string]interface{}: [a b c]"`)
assert.Contains(t, buf.String(), `"output":"stdout"`)
assert.Equal(t, buf.String(), `{"level":"debug","msg":"Executing command 'echo [\"a\",\"b\",\"c\"]' in '' dir","source":"executor/executor.go:43","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"debug","msg":"json log line not map[string]interface{}","source":"executor/executor.go:111","line":["a","b","c"],"output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"info","msg":"[\"a\",\"b\",\"c\"]\n","source":"executor/executor.go:114","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})
Expand All @@ -85,7 +121,21 @@ func TestRunAndLogLines(t *testing.T) {
`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"}, logger)
assert.NoError(t, err)
assert.Contains(t, buf.String(), `"level":"fatal","msg":"hook result","hook":{"a":"b","c":"d"},"output":"stdout","proxyJsonLog":true`)
assert.Equal(t, buf.String(), `{"level":"fatal","msg":"hook result","hook":{"a":"b","c":"d"},"output":"stdout","proxyJsonLog":true,"time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})

t.Run("multiline non json", func(t *testing.T) {
app.LogProxyHookJSON = false
cmd := exec.Command("echo", `
a b
c d
`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"}, logger)
assert.NoError(t, err)
assert.Equal(t, buf.String(), `{"level":"info","msg":"a b","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n"+
`{"level":"info","msg":"c d","output":"stdout","time":"2006-01-02T15:04:05Z"}`+"\n")

buf.Reset()
})
Expand Down
Loading

0 comments on commit 85b478e

Please sign in to comment.