diff --git a/cmd/function.go b/cmd/function.go index 99a0b32..2ccd45e 100644 --- a/cmd/function.go +++ b/cmd/function.go @@ -52,7 +52,7 @@ func NewFunctionCmd() *cobra.Command { taskID = "local" } - _, deferFunc, err := logger.Setup(p, taskID) + control, marker, deferFunc, err := logger.Setup(p, taskID) if err == nil { defer deferFunc() } @@ -135,6 +135,15 @@ func NewFunctionCmd() *cobra.Command { if err := client.Wait(); err != nil { log.Fatal(err) } + // Handle logger flushing + if control != nil { + _, _ = fmt.Fprintf(os.Stderr, "[siderite] waiting for logs to flush\n") + _, _ = fmt.Fprintf(os.Stdout, "%s\n", marker) + select { + case <-control: + case <-time.After(5 * time.Second): + } + } return nil }, } diff --git a/cmd/task.go b/cmd/task.go index f50b97a..b5d37a2 100644 --- a/cmd/task.go +++ b/cmd/task.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "os/exec" + "time" "github.com/iron-io/iron_go3/worker" "github.com/philips-labs/siderite/logger" @@ -52,7 +53,7 @@ func task(parseFlags bool, c chan int) func(cmd *cobra.Command, args []string) e if taskID == "" { taskID = "local" } - done, deferFunc, err := logger.Setup(p, taskID) + control, marker, deferFunc, err := logger.Setup(p, taskID) if err == nil { defer deferFunc() } @@ -88,10 +89,16 @@ func task(parseFlags bool, c chan int) func(cmd *cobra.Command, args []string) e } err = command.Wait() _, _ = fmt.Fprintf(os.Stdout, "[siderite] command result: %v\n", err) - _, _ = fmt.Fprintf(os.Stderr, "[siderite] version %s exit\n", GitCommit) - if done != nil { - done <- true + // Handle logger flushing + if control != nil { + _, _ = fmt.Fprintf(os.Stderr, "[siderite] waiting for logs to flush\n") + _, _ = fmt.Fprintf(os.Stdout, "%s\n", marker) + select { + case <-control: + case <-time.After(5 * time.Second): + } } + _, _ = fmt.Fprintf(os.Stderr, "[siderite] version %s exit\n", GitCommit) return err } } diff --git a/logger/storer.go b/logger/storer.go index 0ec20e7..41b6d39 100644 --- a/logger/storer.go +++ b/logger/storer.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "fmt" "os" + "strings" "time" "github.com/google/uuid" @@ -16,14 +17,16 @@ type Storer interface { StoreResources(messages []logging.Resource, count int) (*logging.StoreResponse, error) } -func Setup(p models.Payload, taskID string) (chan bool, func(), error) { +func Setup(p models.Payload, taskID string) (chan string, string, func(), error) { var err error - done := make(chan bool) + control := make(chan string) old := os.Stdout // keep backup of the real stdout + marker := fmt.Sprintf("!^?%d?~!", uuid.New().ID()) + r, w, err := os.Pipe() if err != nil { _, _ = fmt.Fprintf(os.Stdout, "Error setting up pipe: %v\n", err) - return done, nil, err + return nil, marker, nil, err } os.Stdout = w @@ -38,7 +41,7 @@ func Setup(p models.Payload, taskID string) (chan bool, func(), error) { } if err != nil { os.Stdout = old // Reset - return nil, func() {}, err + return nil, marker, func() {}, err } err = startStorerWorker(r, storer, logging.Resource{ ResourceType: "LogEvent", @@ -52,54 +55,47 @@ func Setup(p models.Payload, taskID string) (chan bool, func(), error) { OriginatingUser: "siderite", ServerName: "hsdp-function.siderite.ironworker", ServiceName: taskID, - }, done) + }, control, marker) if err != nil { os.Stdout = old _, _ = fmt.Fprintf(os.Stderr, "[siderite] not logging to HSDP: %v\n", err) } else { _, _ = fmt.Fprintf(os.Stderr, "[siderite] logging stdout to HSDP\n") } - return done, func() { + return control, marker, func() { os.Stdout = old }, nil } -func startStorerWorker(fd *os.File, client Storer, template logging.Resource, done chan bool) error { +func startStorerWorker(fd *os.File, client Storer, template logging.Resource, control chan string, marker string) error { fdReader := bufio.NewReader(fd) - go func() { - s := make(chan string) + go func() { _, _ = fmt.Fprintf(os.Stderr, "[siderite] logging worker started\n") for { - go func(queue chan string) { - // Next line - text, err := fdReader.ReadString('\n') - if err != nil { - queue <- fmt.Sprintf("error reading: %v\n", err) - return - } - queue <- text - }(s) + text, err := fdReader.ReadString('\n') + if err != nil { + text = fmt.Sprintf("error reading: %v\n", err) + } - select { - case text := <-s: - // Prepare message - template.ID = uuid.New().String() - template.TransactionID = template.ID - template.LogData.Message = base64.StdEncoding.EncodeToString([]byte(text)) - template.LogTime = time.Now().Format("2006-01-02T15:04:05.000Z07:00") + if strings.Contains(text, marker) { + _, _ = fmt.Fprintf(os.Stderr, "[siderite] received control marker. exiting logger\n") + control <- marker // Notify task/function runner + return + } + // Prepare message + template.ID = uuid.New().String() + template.TransactionID = template.ID + template.LogData.Message = base64.StdEncoding.EncodeToString([]byte(text)) + template.LogTime = time.Now().Format("2006-01-02T15:04:05.000Z07:00") - if text != "" { - resp, err := client.StoreResources([]logging.Resource{ - template, - }, 1) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "error storing: %v [%v]\n", err, resp) - } + if text != "" { + resp, err := client.StoreResources([]logging.Resource{ + template, + }, 1) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error storing: %v [%v]\n", err, resp) } - case <-done: - _, _ = fmt.Fprintf(os.Stderr, "[siderite] exiting logger\n") - return } } }() diff --git a/logger/storer_test.go b/logger/storer_test.go index 2d299e9..36cc5ce 100644 --- a/logger/storer_test.go +++ b/logger/storer_test.go @@ -5,7 +5,9 @@ import ( "net/http" "os" "testing" + "time" + "github.com/google/uuid" "github.com/philips-software/go-hsdp-api/logging" "github.com/stretchr/testify/assert" ) @@ -31,9 +33,14 @@ func (d *dummyStorer) StoreResources(messages []logging.Resource, count int) (*l func TestToHSDP(t *testing.T) { ds := &dummyStorer{t} - done := make(chan bool) + control := make(chan string) + marker := fmt.Sprintf("!^?%d?~!", uuid.New().ID()) - err := startStorerWorker(os.Stdout, ds, logging.Resource{ + r, w, err := os.Pipe() + if !assert.Nil(t, err) { + return + } + err = startStorerWorker(r, ds, logging.Resource{ ResourceType: "LogEvent", ApplicationInstance: "foo", EventID: "1", @@ -45,8 +52,23 @@ func TestToHSDP(t *testing.T) { OriginatingUser: "siderite", ServerName: "iron.io", ServiceName: "foo", - }, done) - done <- true - - assert.Nil(t, err) + }, control, marker) + if !assert.Nil(t, err) { + return + } + // Simulate task/func output + quit := make(chan bool) + go func() { + for { + _, _ = fmt.Fprintf(w, "%s\n", marker) // Immediate trigger + select { + case <-time.After(100 * time.Millisecond): + case <-quit: + return + } + } + }() + data := <-control + quit <- true + assert.Equal(t, marker, data) }