diff --git a/README.md b/README.md index dec3349..48c2523 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ `Sumoshell` is collection of utilities to improve analyzing log files written in Go. `grep` can't tell that some log lines span multiple individual lines. Parsing out fields is cumbersome. Aggregating is basically impossible, and there is no good way to view the results. In Sumoshell, each individual command acts as a phase in a pipeline to get the answer you want. Sumoshell brings a lot of the functionality of [Sumo Logic](https://www.sumologic.com) to the command line. Commands should start with -`sumo search [filter]` which will transform logs into the json format `sumoshell` uses. Commands should end with `render` `render-basic` or `graph` which render the output to the terminal. Each operator is a stand-alone binary allowing them to be easily composed. +`sumo search [filter]` which will transform logs into the json format `sumoshell` uses. Commands should end with `render` or `graph` which render the output to the terminal. Each operator is a stand-alone binary allowing them to be easily composed. ## Installation [OSX and Linux binaries are provided for sumoshell](https://github.com/SumoLogic/sumoshell/releases). Simply extract the archive and place the binaries on your path. @@ -20,7 +20,7 @@ go install ./... ## Usage Like [SumoLogic](https://www.sumologic.com), sumoshell enables you pass log data through a series of transformations to get your final result. Pipelines start with a source (`tail`, `cat`, etc.) followed by the `sumo` operator. An example pipeline might be: -```tail -f logfile | sumo search "ERROR" | sumo parse "thread=*]" | sumo count thread | render-basic``` +```tail -f logfile | sumo search "ERROR" | sumo parse "thread=*]" | sumo count thread | render``` This would produce a count of log messages matching `ERROR` by thead. In the basic renderer, the output would look like: ``` @@ -40,9 +40,8 @@ _Id _count thread After using the `sumo` operator, the output will be in JSON. To re-render the output in a human-readable form, `|` the results of your query into one of the three `render` operators. -1. `render-basic`: Capable of rendering aggregate and non-aggregate data. Mimics curses style CLIs by calculating the terminal height and printing new lines to the end to keep your text aligned. Add `nowraw` to drop the raw data when an aggregate isn't present. -2. `render`: Curses based renderer for rendering tabular data. -3. `graph`: Curses based renderer for rendering tabular data as a bar chart. +1. `render`: Capable of rendering aggregate and non-aggregate data. Add `nowraw` to drop the raw data when an aggregate isn't present. Aggregates are updated in place using terminal escape sequences. +2. `graph`: Curses based renderer for rendering tabular data as a bar chart. ### Parsing Data diff --git a/average/Average.go b/average/Average.go index 590d33a..e600160 100644 --- a/average/Average.go +++ b/average/Average.go @@ -9,6 +9,7 @@ import ( ) const Avg = "_avg" + type average struct { samples *int sum *float64 @@ -51,7 +52,7 @@ func Build(args []string) (util.SumoAggOperator, error) { func (avg average) Flush() { avg.mu.Lock() defer avg.mu.Unlock() - if *avg.samples > 0 { + if *avg.samples > 0 { avg.output(util.CreateStartRelation()) avg.output(util.CreateRelation(currentState(avg))) avg.output(util.CreateEndRelation()) diff --git a/count/Count.go b/count/Count.go index e4dbd6e..48ae337 100644 --- a/count/Count.go +++ b/count/Count.go @@ -7,6 +7,7 @@ import ( ) const Count = "_count" + type count struct { ct *int base map[string]interface{} diff --git a/group/Grouper.go b/group/Grouper.go index 665c769..af476dd 100644 --- a/group/Grouper.go +++ b/group/Grouper.go @@ -6,7 +6,6 @@ import ( "os" "strings" "sync" - "time" ) type Grouper struct { @@ -15,7 +14,7 @@ type Grouper struct { merger Merger by []string key string - mu *sync.Mutex + mu *sync.Mutex } type builder func(Merger, string, map[string]interface{}) util.SumoAggOperator @@ -83,8 +82,6 @@ type Merger struct { func NewMerger(sortCol string) Merger { mu := &sync.Mutex{} m := Merger{make(map[int]map[string]interface{}), util.NewJsonWriter(), mu, sortCol} - ticker := time.NewTicker(100 * time.Millisecond) - go flush(m, ticker) return m } @@ -99,10 +96,6 @@ func ExtractId(inp map[string]interface{}) int { } } -func WithId(id int) map[string]interface{} { - return map[string]interface{}{Id: id} -} - func (m Merger) Process(inp map[string]interface{}) { m.mu.Lock() defer m.mu.Unlock() @@ -115,9 +108,9 @@ func (m Merger) Write(inp map[string]interface{}) { m.Process(inp) } - func (m Merger) Flush() { m.mu.Lock() + defer m.mu.Unlock() m.output.Write(util.CreateStartRelationMeta("merger")) // Output keys sorted by index so the ui is consistent if m.sortCol == "" { @@ -138,13 +131,4 @@ func (m Merger) Flush() { m.output.Write(util.CreateEndRelation()) queryString := strings.Join(os.Args[0:], " ") m.output.Write(util.CreateMeta(map[string]interface{}{"_queryString": queryString})) - m.mu.Unlock() -} -func flush(m Merger, ticker *time.Ticker) { - for { - select { - case <-ticker.C: - m.Flush() - } - } } diff --git a/render/Render.go b/render/Render.go index c5ffa95..9b1427b 100644 --- a/render/Render.go +++ b/render/Render.go @@ -15,7 +15,7 @@ type Renderer struct { height int64 width int64 rowsPrinted *int64 - inRelation *bool + inRelation *bool } func main() { @@ -74,7 +74,7 @@ func (r Renderer) Process(inp map[string]interface{}) { fmt.Print("\n") } if util.IsStartRelation(inp) { - if (*r.inRelation) { + if *r.inRelation { panic("Already in relation") } *r.inRelation = true @@ -85,22 +85,25 @@ func (r Renderer) Process(inp map[string]interface{}) { fmt.Printf("\033[1A") } *r.rowsPrinted = 0 - for _, col := range *r.cols { - width := (*r.colWidths)[col] - spaces := strings.Repeat(" ", width-len(col)) - fmt.Printf("%v%s", col, spaces) + if len(*r.cols) > 0 { + r.printHeader() } - *r.rowsPrinted += 1 - fmt.Printf("\n") } if util.IsRelation(inp) { - if (!*r.inRelation) { + // If we haven't printed the header yet + if *r.rowsPrinted >= 20 { + return + } + if !*r.inRelation { panic("Can't get relation before StartRelation") } colsWidth := render.Columns([]map[string]interface{}{inp}) colNames := render.ColumnNames(colsWidth) *r.cols = colNames *r.colWidths = colsWidth + if *r.rowsPrinted == 0 { + r.printHeader() + } for _, col := range colNames { v, _ := inp[col] vStr := fmt.Sprint(v) @@ -115,3 +118,14 @@ func (r Renderer) Process(inp map[string]interface{}) { *r.inRelation = false } } + +func (r Renderer) printHeader() { + for _, col := range *r.cols { + width := (*r.colWidths)[col] + spaces := strings.Repeat(" ", width-len(col)) + fmt.Printf("%v%s", col, spaces) + } + *r.rowsPrinted += 1 + fmt.Printf("\n") + +} diff --git a/search/Main.go b/search/Main.go index 16cff5d..a5b03be 100644 --- a/search/Main.go +++ b/search/Main.go @@ -4,7 +4,9 @@ import "os" import "bufio" import "io" import "github.com/SumoLogic/sumoshell/util" -import "strings" +import ( + "strings" +) func BuildAndConnect(args []string) { if len(args[1:]) > 0 { diff --git a/sum/Sum.go b/sum/Sum.go index 1ad14ca..66fbe64 100644 --- a/sum/Sum.go +++ b/sum/Sum.go @@ -14,7 +14,7 @@ type sum struct { // DO NOT MODIFY BASE base map[string]interface{} output func(map[string]interface{}) - mu *sync.Mutex + mu *sync.Mutex } const Sum = "_sum" diff --git a/sumo/Main.go b/sumo/Main.go index 1a5b6ed..b1c617c 100644 --- a/sumo/Main.go +++ b/sumo/Main.go @@ -12,6 +12,7 @@ import ( "github.com/SumoLogic/sumoshell/util" "os" "time" + // "sync" ) type Builder func([]string) (util.SumoOperator, error) @@ -30,7 +31,6 @@ var aggOperators = map[string]AggBuilder{ } func main() { - args := os.Args if len(args) == 1 { fmt.Println("Arguments expected") @@ -66,8 +66,11 @@ func connectAggOperator(selector string, args []string) bool { fmt.Println(err) } else { ticker := time.NewTicker(100 * time.Millisecond) - go flush(aggOperator, ticker) + done := make(chan bool) + go flush(aggOperator, ticker, done) util.ConnectToStdIn(aggOperator) + ticker.Stop() + done <- true // Flush when the stream completes to ensure all data is accounted for aggOperator.Flush() } @@ -91,11 +94,13 @@ func handleErrorOrWire(operator util.SumoOperator, err error) { } } -func flush(aggOp util.SumoAggOperator, ticker *time.Ticker) { +func flush(aggOp util.SumoAggOperator, ticker *time.Ticker, done chan bool) { for { select { case <-ticker.C: aggOp.Flush() + case <-done: + return } } } diff --git a/util/Raw.go b/util/Raw.go index abba581..1e10dd0 100644 --- a/util/Raw.go +++ b/util/Raw.go @@ -9,9 +9,9 @@ import "os" import "bufio" import "log" import ( + "sort" "strconv" "sync" - "sort" ) type RawInputHandler struct { @@ -151,17 +151,12 @@ func NewRawInputHandler(inp io.Writer) *RawInputHandler { return &RawInputHandler{inp, []rune{}} } -func NewRawInputHandlerStdout() *RawInputHandler { - return NewRawInputHandler(os.Stdout) -} - func (handler *RawInputHandler) Flush() { m := make(map[string]interface{}) m[Raw] = string(handler.buff) m[Type] = Plus handler.buff = []rune{} b, err := json.Marshal(m) - //fmt.Printf(b) if err != nil { fmt.Printf("ERROR!", err) } else { @@ -172,7 +167,7 @@ func (handler *RawInputHandler) Flush() { type JsonWriter struct { writer io.Writer - mu *sync.Mutex + mu *sync.Mutex } func NewJsonWriter() *JsonWriter { @@ -198,12 +193,14 @@ func CoerceNumber(v interface{}) (float64, error) { type Datum []map[string]interface{} type By func(p1, p2 *map[string]interface{}) bool -func (a datumSorter) Len() int { return len(a.data) } -func (a datumSorter) Swap(i, j int) { a.data[i], a.data[j] = a.data[j], a.data[i] } + +func (a datumSorter) Len() int { return len(a.data) } +func (a datumSorter) Swap(i, j int) { a.data[i], a.data[j] = a.data[j], a.data[i] } + // planetSorter joins a By function and a slice of Planets to be sorted. type datumSorter struct { data Datum - by func(p1, p2 map[string]interface{}) bool // Closure used in the Less method. + by func(p1, p2 map[string]interface{}) bool // Closure used in the Less method. } func (a datumSorter) Less(i, j int) bool { @@ -216,6 +213,7 @@ func SortByField(field string, data Datum) { v2, err2 := CoerceNumber(p2[field]) if err1 != nil || err2 != nil { + fmt.Print(data) panic(err1) }