Skip to content

Commit

Permalink
Logger: implement flushing mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
loafoe committed Nov 27, 2021
1 parent 4c7230f commit 50216e2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 46 deletions.
11 changes: 10 additions & 1 deletion cmd/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewFunctionCmd() *cobra.Command {
taskID = "local"
}

_, deferFunc, err := logger.Setup(p, taskID)
control, marker, deferFunc, err := logger.Setup(p, taskID)
if err == nil {
defer deferFunc()
}
Expand Down Expand Up @@ -135,6 +135,15 @@ func NewFunctionCmd() *cobra.Command {
if err := client.Wait(); err != nil {
log.Fatal(err)
}
// Handle logger flushing
if control != nil {
_, _ = fmt.Fprintf(os.Stderr, "[siderite] waiting for logs to flush\n")
_, _ = fmt.Fprintf(os.Stdout, "%s\n", marker)
select {
case <-control:
case <-time.After(5 * time.Second):
}
}
return nil
},
}
Expand Down
15 changes: 11 additions & 4 deletions cmd/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"os/exec"
"time"

"github.com/iron-io/iron_go3/worker"
"github.com/philips-labs/siderite/logger"
Expand Down Expand Up @@ -52,7 +53,7 @@ func task(parseFlags bool, c chan int) func(cmd *cobra.Command, args []string) e
if taskID == "" {
taskID = "local"
}
done, deferFunc, err := logger.Setup(p, taskID)
control, marker, deferFunc, err := logger.Setup(p, taskID)
if err == nil {
defer deferFunc()
}
Expand Down Expand Up @@ -88,10 +89,16 @@ func task(parseFlags bool, c chan int) func(cmd *cobra.Command, args []string) e
}
err = command.Wait()
_, _ = fmt.Fprintf(os.Stdout, "[siderite] command result: %v\n", err)
_, _ = fmt.Fprintf(os.Stderr, "[siderite] version %s exit\n", GitCommit)
if done != nil {
done <- true
// Handle logger flushing
if control != nil {
_, _ = fmt.Fprintf(os.Stderr, "[siderite] waiting for logs to flush\n")
_, _ = fmt.Fprintf(os.Stdout, "%s\n", marker)
select {
case <-control:
case <-time.After(5 * time.Second):
}
}
_, _ = fmt.Fprintf(os.Stderr, "[siderite] version %s exit\n", GitCommit)
return err
}
}
66 changes: 31 additions & 35 deletions logger/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"fmt"
"os"
"strings"
"time"

"github.com/google/uuid"
Expand All @@ -16,14 +17,16 @@ type Storer interface {
StoreResources(messages []logging.Resource, count int) (*logging.StoreResponse, error)
}

func Setup(p models.Payload, taskID string) (chan bool, func(), error) {
func Setup(p models.Payload, taskID string) (chan string, string, func(), error) {
var err error
done := make(chan bool)
control := make(chan string)
old := os.Stdout // keep backup of the real stdout
marker := fmt.Sprintf("!^?%d?~!", uuid.New().ID())

r, w, err := os.Pipe()
if err != nil {
_, _ = fmt.Fprintf(os.Stdout, "Error setting up pipe: %v\n", err)
return done, nil, err
return nil, marker, nil, err
}
os.Stdout = w

Expand All @@ -38,7 +41,7 @@ func Setup(p models.Payload, taskID string) (chan bool, func(), error) {
}
if err != nil {
os.Stdout = old // Reset
return nil, func() {}, err
return nil, marker, func() {}, err
}
err = startStorerWorker(r, storer, logging.Resource{
ResourceType: "LogEvent",
Expand All @@ -52,54 +55,47 @@ func Setup(p models.Payload, taskID string) (chan bool, func(), error) {
OriginatingUser: "siderite",
ServerName: "hsdp-function.siderite.ironworker",
ServiceName: taskID,
}, done)
}, control, marker)
if err != nil {
os.Stdout = old
_, _ = fmt.Fprintf(os.Stderr, "[siderite] not logging to HSDP: %v\n", err)
} else {
_, _ = fmt.Fprintf(os.Stderr, "[siderite] logging stdout to HSDP\n")
}
return done, func() {
return control, marker, func() {
os.Stdout = old
}, nil
}

func startStorerWorker(fd *os.File, client Storer, template logging.Resource, done chan bool) error {
func startStorerWorker(fd *os.File, client Storer, template logging.Resource, control chan string, marker string) error {
fdReader := bufio.NewReader(fd)
go func() {
s := make(chan string)

go func() {
_, _ = fmt.Fprintf(os.Stderr, "[siderite] logging worker started\n")
for {
go func(queue chan string) {
// Next line
text, err := fdReader.ReadString('\n')
if err != nil {
queue <- fmt.Sprintf("error reading: %v\n", err)
return
}
queue <- text
}(s)
text, err := fdReader.ReadString('\n')
if err != nil {
text = fmt.Sprintf("error reading: %v\n", err)
}

select {
case text := <-s:
// Prepare message
template.ID = uuid.New().String()
template.TransactionID = template.ID
template.LogData.Message = base64.StdEncoding.EncodeToString([]byte(text))
template.LogTime = time.Now().Format("2006-01-02T15:04:05.000Z07:00")
if strings.Contains(text, marker) {
_, _ = fmt.Fprintf(os.Stderr, "[siderite] received control marker. exiting logger\n")
control <- marker // Notify task/function runner
return
}
// Prepare message
template.ID = uuid.New().String()
template.TransactionID = template.ID
template.LogData.Message = base64.StdEncoding.EncodeToString([]byte(text))
template.LogTime = time.Now().Format("2006-01-02T15:04:05.000Z07:00")

if text != "" {
resp, err := client.StoreResources([]logging.Resource{
template,
}, 1)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error storing: %v [%v]\n", err, resp)
}
if text != "" {
resp, err := client.StoreResources([]logging.Resource{
template,
}, 1)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error storing: %v [%v]\n", err, resp)
}
case <-done:
_, _ = fmt.Fprintf(os.Stderr, "[siderite] exiting logger\n")
return
}
}
}()
Expand Down
34 changes: 28 additions & 6 deletions logger/storer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"net/http"
"os"
"testing"
"time"

"github.com/google/uuid"
"github.com/philips-software/go-hsdp-api/logging"
"github.com/stretchr/testify/assert"
)
Expand All @@ -31,9 +33,14 @@ func (d *dummyStorer) StoreResources(messages []logging.Resource, count int) (*l

func TestToHSDP(t *testing.T) {
ds := &dummyStorer{t}
done := make(chan bool)
control := make(chan string)
marker := fmt.Sprintf("!^?%d?~!", uuid.New().ID())

err := startStorerWorker(os.Stdout, ds, logging.Resource{
r, w, err := os.Pipe()
if !assert.Nil(t, err) {
return
}
err = startStorerWorker(r, ds, logging.Resource{
ResourceType: "LogEvent",
ApplicationInstance: "foo",
EventID: "1",
Expand All @@ -45,8 +52,23 @@ func TestToHSDP(t *testing.T) {
OriginatingUser: "siderite",
ServerName: "iron.io",
ServiceName: "foo",
}, done)
done <- true

assert.Nil(t, err)
}, control, marker)
if !assert.Nil(t, err) {
return
}
// Simulate task/func output
quit := make(chan bool)
go func() {
for {
_, _ = fmt.Fprintf(w, "%s\n", marker) // Immediate trigger
select {
case <-time.After(100 * time.Millisecond):
case <-quit:
return
}
}
}()
data := <-control
quit <- true
assert.Equal(t, marker, data)
}

0 comments on commit 50216e2

Please sign in to comment.