From fe2c53255ed0abfbea455c2358ec144c01b9a608 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Mon, 26 Dec 2016 18:17:01 -0500 Subject: [PATCH 1/5] WIP --- average/Average.go | 3 ++- count/Count.go | 1 + group/Grouper.go | 10 +++++++--- render/Render.go | 9 ++++++--- search/Main.go | 5 ++++- sum/Sum.go | 2 +- sumo/Main.go | 14 +++++++++++--- util/Raw.go | 18 ++++++++---------- 8 files changed, 40 insertions(+), 22 deletions(-) diff --git a/average/Average.go b/average/Average.go index 590d33a..e600160 100644 --- a/average/Average.go +++ b/average/Average.go @@ -9,6 +9,7 @@ import ( ) const Avg = "_avg" + type average struct { samples *int sum *float64 @@ -51,7 +52,7 @@ func Build(args []string) (util.SumoAggOperator, error) { func (avg average) Flush() { avg.mu.Lock() defer avg.mu.Unlock() - if *avg.samples > 0 { + if *avg.samples > 0 { avg.output(util.CreateStartRelation()) avg.output(util.CreateRelation(currentState(avg))) avg.output(util.CreateEndRelation()) diff --git a/count/Count.go b/count/Count.go index e4dbd6e..48ae337 100644 --- a/count/Count.go +++ b/count/Count.go @@ -7,6 +7,7 @@ import ( ) const Count = "_count" + type count struct { ct *int base map[string]interface{} diff --git a/group/Grouper.go b/group/Grouper.go index 665c769..ed4a183 100644 --- a/group/Grouper.go +++ b/group/Grouper.go @@ -15,7 +15,7 @@ type Grouper struct { merger Merger by []string key string - mu *sync.Mutex + mu *sync.Mutex } type builder func(Merger, string, map[string]interface{}) util.SumoAggOperator @@ -115,9 +115,13 @@ func (m Merger) Write(inp map[string]interface{}) { m.Process(inp) } - func (m Merger) Flush() { m.mu.Lock() + defer m.mu.Unlock() + if len(m.aggregate) == 0 { + fmt.Println("No aggregate") + return + } m.output.Write(util.CreateStartRelationMeta("merger")) // Output keys sorted by index so the ui is consistent if m.sortCol == "" { @@ -138,8 +142,8 @@ func (m Merger) Flush() { m.output.Write(util.CreateEndRelation()) queryString := strings.Join(os.Args[0:], " ") m.output.Write(util.CreateMeta(map[string]interface{}{"_queryString": queryString})) - m.mu.Unlock() } + func flush(m Merger, ticker *time.Ticker) { for { select { diff --git a/render/Render.go b/render/Render.go index c5ffa95..4421e23 100644 --- a/render/Render.go +++ b/render/Render.go @@ -15,7 +15,7 @@ type Renderer struct { height int64 width int64 rowsPrinted *int64 - inRelation *bool + inRelation *bool } func main() { @@ -74,7 +74,7 @@ func (r Renderer) Process(inp map[string]interface{}) { fmt.Print("\n") } if util.IsStartRelation(inp) { - if (*r.inRelation) { + if *r.inRelation { panic("Already in relation") } *r.inRelation = true @@ -94,7 +94,10 @@ func (r Renderer) Process(inp map[string]interface{}) { fmt.Printf("\n") } if util.IsRelation(inp) { - if (!*r.inRelation) { + if *r.rowsPrinted >= 10 { + return + } + if !*r.inRelation { panic("Can't get relation before StartRelation") } colsWidth := render.Columns([]map[string]interface{}{inp}) diff --git a/search/Main.go b/search/Main.go index 16cff5d..3608953 100644 --- a/search/Main.go +++ b/search/Main.go @@ -4,7 +4,10 @@ import "os" import "bufio" import "io" import "github.com/SumoLogic/sumoshell/util" -import "strings" +import ( + "strings" + "fmt" +) func BuildAndConnect(args []string) { if len(args[1:]) > 0 { diff --git a/sum/Sum.go b/sum/Sum.go index 1ad14ca..66fbe64 100644 --- a/sum/Sum.go +++ b/sum/Sum.go @@ -14,7 +14,7 @@ type sum struct { // DO NOT MODIFY BASE base map[string]interface{} output func(map[string]interface{}) - mu *sync.Mutex + mu *sync.Mutex } const Sum = "_sum" diff --git a/sumo/Main.go b/sumo/Main.go index 1a5b6ed..c026a8b 100644 --- a/sumo/Main.go +++ b/sumo/Main.go @@ -12,6 +12,7 @@ import ( "github.com/SumoLogic/sumoshell/util" "os" "time" + "sync" ) type Builder func([]string) (util.SumoOperator, error) @@ -30,7 +31,6 @@ var aggOperators = map[string]AggBuilder{ } func main() { - args := os.Args if len(args) == 1 { fmt.Println("Arguments expected") @@ -66,8 +66,14 @@ func connectAggOperator(selector string, args []string) bool { fmt.Println(err) } else { ticker := time.NewTicker(100 * time.Millisecond) - go flush(aggOperator, ticker) + done := make(chan bool) + var wg = sync.WaitGroup{} + wg.Add(1) + go flush(aggOperator, ticker, done) util.ConnectToStdIn(aggOperator) + ticker.Stop() + done <- true + wg.Wait() // Flush when the stream completes to ensure all data is accounted for aggOperator.Flush() } @@ -91,11 +97,13 @@ func handleErrorOrWire(operator util.SumoOperator, err error) { } } -func flush(aggOp util.SumoAggOperator, ticker *time.Ticker) { +func flush(aggOp util.SumoAggOperator, ticker *time.Ticker, done chan bool) { for { select { case <-ticker.C: aggOp.Flush() + case <- done: + return } } } diff --git a/util/Raw.go b/util/Raw.go index abba581..1e10dd0 100644 --- a/util/Raw.go +++ b/util/Raw.go @@ -9,9 +9,9 @@ import "os" import "bufio" import "log" import ( + "sort" "strconv" "sync" - "sort" ) type RawInputHandler struct { @@ -151,17 +151,12 @@ func NewRawInputHandler(inp io.Writer) *RawInputHandler { return &RawInputHandler{inp, []rune{}} } -func NewRawInputHandlerStdout() *RawInputHandler { - return NewRawInputHandler(os.Stdout) -} - func (handler *RawInputHandler) Flush() { m := make(map[string]interface{}) m[Raw] = string(handler.buff) m[Type] = Plus handler.buff = []rune{} b, err := json.Marshal(m) - //fmt.Printf(b) if err != nil { fmt.Printf("ERROR!", err) } else { @@ -172,7 +167,7 @@ func (handler *RawInputHandler) Flush() { type JsonWriter struct { writer io.Writer - mu *sync.Mutex + mu *sync.Mutex } func NewJsonWriter() *JsonWriter { @@ -198,12 +193,14 @@ func CoerceNumber(v interface{}) (float64, error) { type Datum []map[string]interface{} type By func(p1, p2 *map[string]interface{}) bool -func (a datumSorter) Len() int { return len(a.data) } -func (a datumSorter) Swap(i, j int) { a.data[i], a.data[j] = a.data[j], a.data[i] } + +func (a datumSorter) Len() int { return len(a.data) } +func (a datumSorter) Swap(i, j int) { a.data[i], a.data[j] = a.data[j], a.data[i] } + // planetSorter joins a By function and a slice of Planets to be sorted. type datumSorter struct { data Datum - by func(p1, p2 map[string]interface{}) bool // Closure used in the Less method. + by func(p1, p2 map[string]interface{}) bool // Closure used in the Less method. } func (a datumSorter) Less(i, j int) bool { @@ -216,6 +213,7 @@ func SortByField(field string, data Datum) { v2, err2 := CoerceNumber(p2[field]) if err1 != nil || err2 != nil { + fmt.Print(data) panic(err1) } From 3a6750d42f5b8744f67d7c3574618bc62ac41ffa Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Mon, 26 Dec 2016 18:39:37 -0500 Subject: [PATCH 2/5] Fix first-header-row bug --- group/Grouper.go | 4 ---- render/Render.go | 25 ++++++++++++++++++------- search/Main.go | 1 - sumo/Main.go | 7 ++----- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/group/Grouper.go b/group/Grouper.go index ed4a183..27c6454 100644 --- a/group/Grouper.go +++ b/group/Grouper.go @@ -118,10 +118,6 @@ func (m Merger) Write(inp map[string]interface{}) { func (m Merger) Flush() { m.mu.Lock() defer m.mu.Unlock() - if len(m.aggregate) == 0 { - fmt.Println("No aggregate") - return - } m.output.Write(util.CreateStartRelationMeta("merger")) // Output keys sorted by index so the ui is consistent if m.sortCol == "" { diff --git a/render/Render.go b/render/Render.go index 4421e23..9b1427b 100644 --- a/render/Render.go +++ b/render/Render.go @@ -85,16 +85,13 @@ func (r Renderer) Process(inp map[string]interface{}) { fmt.Printf("\033[1A") } *r.rowsPrinted = 0 - for _, col := range *r.cols { - width := (*r.colWidths)[col] - spaces := strings.Repeat(" ", width-len(col)) - fmt.Printf("%v%s", col, spaces) + if len(*r.cols) > 0 { + r.printHeader() } - *r.rowsPrinted += 1 - fmt.Printf("\n") } if util.IsRelation(inp) { - if *r.rowsPrinted >= 10 { + // If we haven't printed the header yet + if *r.rowsPrinted >= 20 { return } if !*r.inRelation { @@ -104,6 +101,9 @@ func (r Renderer) Process(inp map[string]interface{}) { colNames := render.ColumnNames(colsWidth) *r.cols = colNames *r.colWidths = colsWidth + if *r.rowsPrinted == 0 { + r.printHeader() + } for _, col := range colNames { v, _ := inp[col] vStr := fmt.Sprint(v) @@ -118,3 +118,14 @@ func (r Renderer) Process(inp map[string]interface{}) { *r.inRelation = false } } + +func (r Renderer) printHeader() { + for _, col := range *r.cols { + width := (*r.colWidths)[col] + spaces := strings.Repeat(" ", width-len(col)) + fmt.Printf("%v%s", col, spaces) + } + *r.rowsPrinted += 1 + fmt.Printf("\n") + +} diff --git a/search/Main.go b/search/Main.go index 3608953..a5b03be 100644 --- a/search/Main.go +++ b/search/Main.go @@ -6,7 +6,6 @@ import "io" import "github.com/SumoLogic/sumoshell/util" import ( "strings" - "fmt" ) func BuildAndConnect(args []string) { diff --git a/sumo/Main.go b/sumo/Main.go index c026a8b..b1c617c 100644 --- a/sumo/Main.go +++ b/sumo/Main.go @@ -12,7 +12,7 @@ import ( "github.com/SumoLogic/sumoshell/util" "os" "time" - "sync" + // "sync" ) type Builder func([]string) (util.SumoOperator, error) @@ -67,13 +67,10 @@ func connectAggOperator(selector string, args []string) bool { } else { ticker := time.NewTicker(100 * time.Millisecond) done := make(chan bool) - var wg = sync.WaitGroup{} - wg.Add(1) go flush(aggOperator, ticker, done) util.ConnectToStdIn(aggOperator) ticker.Stop() done <- true - wg.Wait() // Flush when the stream completes to ensure all data is accounted for aggOperator.Flush() } @@ -102,7 +99,7 @@ func flush(aggOp util.SumoAggOperator, ticker *time.Ticker, done chan bool) { select { case <-ticker.C: aggOp.Flush() - case <- done: + case <-done: return } } From 3f5d4bc00632f764fa85d879810bc1cc4261f8d8 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Sun, 1 Jan 2017 14:22:00 -0500 Subject: [PATCH 3/5] Remove unneeded flushing channel Having a separate merger-flusher meant that there was a race between the merger flushing and the grouper flushing. Since the grouper flush triggers the merger flush we can rely on the grouper to propogate the flush signal. --- group/Grouper.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/group/Grouper.go b/group/Grouper.go index 27c6454..78cec09 100644 --- a/group/Grouper.go +++ b/group/Grouper.go @@ -6,7 +6,6 @@ import ( "os" "strings" "sync" - "time" ) type Grouper struct { @@ -83,8 +82,6 @@ type Merger struct { func NewMerger(sortCol string) Merger { mu := &sync.Mutex{} m := Merger{make(map[int]map[string]interface{}), util.NewJsonWriter(), mu, sortCol} - ticker := time.NewTicker(100 * time.Millisecond) - go flush(m, ticker) return m } @@ -139,12 +136,3 @@ func (m Merger) Flush() { queryString := strings.Join(os.Args[0:], " ") m.output.Write(util.CreateMeta(map[string]interface{}{"_queryString": queryString})) } - -func flush(m Merger, ticker *time.Ticker) { - for { - select { - case <-ticker.C: - m.Flush() - } - } -} From 66bbec967420420c366f4362f8181b2ff99a2669 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Sun, 1 Jan 2017 14:27:22 -0500 Subject: [PATCH 4/5] Update README.md --- README.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index dec3349..48c2523 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ `Sumoshell` is collection of utilities to improve analyzing log files written in Go. `grep` can't tell that some log lines span multiple individual lines. Parsing out fields is cumbersome. Aggregating is basically impossible, and there is no good way to view the results. In Sumoshell, each individual command acts as a phase in a pipeline to get the answer you want. Sumoshell brings a lot of the functionality of [Sumo Logic](https://www.sumologic.com) to the command line. Commands should start with -`sumo search [filter]` which will transform logs into the json format `sumoshell` uses. Commands should end with `render` `render-basic` or `graph` which render the output to the terminal. Each operator is a stand-alone binary allowing them to be easily composed. +`sumo search [filter]` which will transform logs into the json format `sumoshell` uses. Commands should end with `render` or `graph` which render the output to the terminal. Each operator is a stand-alone binary allowing them to be easily composed. ## Installation [OSX and Linux binaries are provided for sumoshell](https://github.com/SumoLogic/sumoshell/releases). Simply extract the archive and place the binaries on your path. @@ -20,7 +20,7 @@ go install ./... ## Usage Like [SumoLogic](https://www.sumologic.com), sumoshell enables you pass log data through a series of transformations to get your final result. Pipelines start with a source (`tail`, `cat`, etc.) followed by the `sumo` operator. An example pipeline might be: -```tail -f logfile | sumo search "ERROR" | sumo parse "thread=*]" | sumo count thread | render-basic``` +```tail -f logfile | sumo search "ERROR" | sumo parse "thread=*]" | sumo count thread | render``` This would produce a count of log messages matching `ERROR` by thead. In the basic renderer, the output would look like: ``` @@ -40,9 +40,8 @@ _Id _count thread After using the `sumo` operator, the output will be in JSON. To re-render the output in a human-readable form, `|` the results of your query into one of the three `render` operators. -1. `render-basic`: Capable of rendering aggregate and non-aggregate data. Mimics curses style CLIs by calculating the terminal height and printing new lines to the end to keep your text aligned. Add `nowraw` to drop the raw data when an aggregate isn't present. -2. `render`: Curses based renderer for rendering tabular data. -3. `graph`: Curses based renderer for rendering tabular data as a bar chart. +1. `render`: Capable of rendering aggregate and non-aggregate data. Add `nowraw` to drop the raw data when an aggregate isn't present. Aggregates are updated in place using terminal escape sequences. +2. `graph`: Curses based renderer for rendering tabular data as a bar chart. ### Parsing Data From 9de2b3658d20df8b0e3b3dcfb597739da9bcd46e Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Sun, 1 Jan 2017 14:27:47 -0500 Subject: [PATCH 5/5] Delete unused code --- group/Grouper.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/group/Grouper.go b/group/Grouper.go index 78cec09..af476dd 100644 --- a/group/Grouper.go +++ b/group/Grouper.go @@ -96,10 +96,6 @@ func ExtractId(inp map[string]interface{}) int { } } -func WithId(id int) map[string]interface{} { - return map[string]interface{}{Id: id} -} - func (m Merger) Process(inp map[string]interface{}) { m.mu.Lock() defer m.mu.Unlock()